commit 7e85bb0fcc4b5a050602111607ace2843cba64ba Author: Axodouble Date: Tue May 12 06:07:16 2026 +0000 Initial structure diff --git a/cmd/qu/main.go b/cmd/qu/main.go new file mode 100644 index 0000000..ec0d630 --- /dev/null +++ b/cmd/qu/main.go @@ -0,0 +1,21 @@ +// Package main is the entry point for the qu binary. +// +// qu is a quorum-based uptime monitor. Multiple cooperating nodes +// run identical copies of this binary; they elect a master that +// owns alert dispatch and check aggregation while every node +// independently probes the configured targets. +package main + +import ( + "fmt" + "os" + + "github.com/jasper/quptime/internal/cli" +) + +func main() { + if err := cli.NewRootCommand().Execute(); err != nil { + fmt.Fprintln(os.Stderr, "error:", err) + os.Exit(1) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e2a0982 --- /dev/null +++ b/go.mod @@ -0,0 +1,18 @@ +module github.com/jasper/quptime + +go 1.23 + +require ( + github.com/spf13/cobra v1.8.1 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + github.com/google/uuid v1.6.0 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/prometheus-community/pro-bing v0.4.1 // indirect + github.com/spf13/pflag v1.0.5 // indirect + golang.org/x/net v0.27.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.22.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8ba4924 --- /dev/null +++ b/go.sum @@ -0,0 +1,22 @@ +github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/prometheus-community/pro-bing v0.4.1 h1:aMaJwyifHZO0y+h8+icUz0xbToHbia0wdmzdVZ+Kl3w= +github.com/prometheus-community/pro-bing v0.4.1/go.mod h1:aLsw+zqCaDoa2RLVVSX3+UiCkBBXTMtZC3c7EkfWnAE= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= +github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/alerts/discord.go b/internal/alerts/discord.go new file mode 100644 index 0000000..22973ac --- /dev/null +++ b/internal/alerts/discord.go @@ -0,0 +1,56 @@ +package alerts + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "time" + + "github.com/jasper/quptime/internal/config" +) + +// discordTimeout caps how long a single webhook POST is allowed to +// take. +const discordTimeout = 10 * time.Second + +// discordPayload is the minimum shape the Discord webhook API +// accepts. We do not use embeds — plain text keeps the payload +// trivial to read in operator-side logs. +type discordPayload struct { + Content string `json:"content"` +} + +// sendDiscord posts msg.Subject + body to the configured webhook URL. +func sendDiscord(a *config.Alert, msg Message) error { + if a.DiscordWebhook == "" { + return errors.New("discord webhook url not set") + } + + content := msg.Subject + "\n```\n" + msg.Body + "\n```" + raw, err := json.Marshal(discordPayload{Content: content}) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), discordTimeout) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodPost, a.DiscordWebhook, bytes.NewReader(raw)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("discord webhook: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + return fmt.Errorf("discord webhook status %d: %s", resp.StatusCode, string(body)) + } + return nil +} diff --git a/internal/alerts/dispatcher.go b/internal/alerts/dispatcher.go new file mode 100644 index 0000000..462de8a --- /dev/null +++ b/internal/alerts/dispatcher.go @@ -0,0 +1,69 @@ +package alerts + +import ( + "fmt" + "log" + + "github.com/jasper/quptime/internal/checks" + "github.com/jasper/quptime/internal/config" +) + +// Dispatcher fans an aggregator transition out to every alert listed +// on the check. Errors are logged but never propagated: alerting must +// not block the aggregation pipeline. +type Dispatcher struct { + cluster *config.ClusterConfig + selfID string + logger *log.Logger +} + +// New constructs a Dispatcher. +func New(cluster *config.ClusterConfig, selfID string, logger *log.Logger) *Dispatcher { + if logger == nil { + logger = log.Default() + } + return &Dispatcher{cluster: cluster, selfID: selfID, logger: logger} +} + +// OnTransition is wired as checks.TransitionFn. +func (d *Dispatcher) OnTransition(check *config.Check, from, to checks.State, snap checks.Snapshot) { + if to == checks.StateUnknown { + return + } + msg := Render(d.selfID, check, from, to, snap) + for _, alertID := range check.AlertIDs { + alert, _ := d.cluster.FindAlert(alertID) + if alert == nil { + d.logger.Printf("alerts: check %q references unknown alert %q", check.Name, alertID) + continue + } + if err := d.dispatchOne(alert, msg); err != nil { + d.logger.Printf("alerts: %q via %s: %v", alert.Name, alert.Type, err) + } + } +} + +// Test sends a one-shot test message to the named alert. Returns an +// error so the CLI can surface failures interactively. +func (d *Dispatcher) Test(alertID string) error { + alert, _ := d.cluster.FindAlert(alertID) + if alert == nil { + return fmt.Errorf("alert %q not found", alertID) + } + msg := Message{ + Subject: "[quptime] test alert", + Body: fmt.Sprintf("This is a test of alert %q from node %s.\nIf you see this, the alert channel is wired correctly.\n", alert.Name, d.selfID), + } + return d.dispatchOne(alert, msg) +} + +func (d *Dispatcher) dispatchOne(a *config.Alert, msg Message) error { + switch a.Type { + case config.AlertSMTP: + return sendSMTP(a, msg) + case config.AlertDiscord: + return sendDiscord(a, msg) + default: + return fmt.Errorf("unknown alert type %q", a.Type) + } +} diff --git a/internal/alerts/message.go b/internal/alerts/message.go new file mode 100644 index 0000000..2429777 --- /dev/null +++ b/internal/alerts/message.go @@ -0,0 +1,53 @@ +// Package alerts dispatches state-transition notifications to the +// configured channels (SMTP, Discord). The aggregator owns hysteresis +// so this package fires exactly one message per UP↔DOWN flip. +package alerts + +import ( + "fmt" + "strings" + "time" + + "github.com/jasper/quptime/internal/checks" + "github.com/jasper/quptime/internal/config" +) + +// Message is the rendered notification ready to ship across any +// channel. Channels may format Subject + Body differently (SMTP uses +// both; Discord renders a single string). +type Message struct { + Subject string + Body string +} + +// Render produces a human-readable message from one state transition. +func Render(nodeID string, check *config.Check, from, to checks.State, snap checks.Snapshot) Message { + now := time.Now().UTC().Format(time.RFC3339) + verb := transitionVerb(from, to) + subject := fmt.Sprintf("[quptime] %s %s — %s", check.Name, verb, check.Target) + + var b strings.Builder + fmt.Fprintf(&b, "Check %q is now %s.\n", check.Name, strings.ToUpper(string(to))) + fmt.Fprintf(&b, "Previous state: %s\n", from) + fmt.Fprintf(&b, "Target: %s (%s)\n", check.Target, check.Type) + fmt.Fprintf(&b, "Reports: %d (ok=%d, fail=%d)\n", snap.Reports, snap.OKCount, snap.NotOK) + if snap.Detail != "" { + fmt.Fprintf(&b, "Detail: %s\n", snap.Detail) + } + fmt.Fprintf(&b, "Master: %s\n", nodeID) + fmt.Fprintf(&b, "When: %s\n", now) + return Message{Subject: subject, Body: b.String()} +} + +func transitionVerb(from, to checks.State) string { + switch to { + case checks.StateDown: + return "DOWN" + case checks.StateUp: + if from == checks.StateDown { + return "RECOVERED" + } + return "UP" + } + return strings.ToUpper(string(to)) +} diff --git a/internal/alerts/smtp.go b/internal/alerts/smtp.go new file mode 100644 index 0000000..da60864 --- /dev/null +++ b/internal/alerts/smtp.go @@ -0,0 +1,78 @@ +package alerts + +import ( + "crypto/tls" + "errors" + "fmt" + "net/smtp" + "strings" + + "github.com/jasper/quptime/internal/config" +) + +// sendSMTP delivers msg through the alert's SMTP relay. STARTTLS is +// negotiated whenever the alert has SMTPStartTLS true; the smtp +// server is responsible for advertising the extension. +func sendSMTP(a *config.Alert, msg Message) error { + if a.SMTPHost == "" || a.SMTPPort == 0 { + return errors.New("smtp host/port not set") + } + if a.SMTPFrom == "" || len(a.SMTPTo) == 0 { + return errors.New("smtp from/to not set") + } + + addr := fmt.Sprintf("%s:%d", a.SMTPHost, a.SMTPPort) + client, err := smtp.Dial(addr) + if err != nil { + return fmt.Errorf("dial smtp: %w", err) + } + defer client.Close() + + if a.SMTPStartTLS { + if ok, _ := client.Extension("STARTTLS"); !ok { + return errors.New("server does not support STARTTLS") + } + if err := client.StartTLS(&tls.Config{ServerName: a.SMTPHost, MinVersion: tls.VersionTLS12}); err != nil { + return fmt.Errorf("starttls: %w", err) + } + } + + if a.SMTPUser != "" { + auth := smtp.PlainAuth("", a.SMTPUser, a.SMTPPassword, a.SMTPHost) + if err := client.Auth(auth); err != nil { + return fmt.Errorf("smtp auth: %w", err) + } + } + + if err := client.Mail(a.SMTPFrom); err != nil { + return fmt.Errorf("mail from: %w", err) + } + for _, rcpt := range a.SMTPTo { + if err := client.Rcpt(rcpt); err != nil { + return fmt.Errorf("rcpt %s: %w", rcpt, err) + } + } + w, err := client.Data() + if err != nil { + return fmt.Errorf("data: %w", err) + } + if _, err := w.Write(buildRFC822(a.SMTPFrom, a.SMTPTo, msg)); err != nil { + return fmt.Errorf("write body: %w", err) + } + if err := w.Close(); err != nil { + return fmt.Errorf("close body: %w", err) + } + return client.Quit() +} + +func buildRFC822(from string, to []string, msg Message) []byte { + var sb strings.Builder + fmt.Fprintf(&sb, "From: %s\r\n", from) + fmt.Fprintf(&sb, "To: %s\r\n", strings.Join(to, ", ")) + fmt.Fprintf(&sb, "Subject: %s\r\n", msg.Subject) + fmt.Fprintf(&sb, "MIME-Version: 1.0\r\n") + fmt.Fprintf(&sb, "Content-Type: text/plain; charset=UTF-8\r\n") + fmt.Fprintf(&sb, "\r\n") + sb.WriteString(msg.Body) + return []byte(sb.String()) +} diff --git a/internal/checks/aggregator.go b/internal/checks/aggregator.go new file mode 100644 index 0000000..660ea5a --- /dev/null +++ b/internal/checks/aggregator.go @@ -0,0 +1,253 @@ +package checks + +import ( + "sync" + "time" + + "github.com/jasper/quptime/internal/config" +) + +// State is the aggregate verdict on one check. +type State string + +const ( + StateUnknown State = "unknown" + StateUp State = "up" + StateDown State = "down" +) + +// HysteresisCount is how many consecutive evaluations a candidate +// state must hold before becoming the new committed state. 2 matches +// the design doc ("down for ≥2 consecutive aggregate evaluations"). +const HysteresisCount = 2 + +// TransitionFn is fired when a check's committed state flips. The +// alert dispatcher is the production consumer. +type TransitionFn func(check *config.Check, from, to State, snap Snapshot) + +// Snapshot summarises one check's current aggregate state. +type Snapshot struct { + CheckID string + State State + Reports int // number of fresh per-node results + OKCount int // number reporting OK + NotOK int // number reporting not-OK + Detail string + UpdateAt time.Time +} + +// Aggregator runs on the master only. Other nodes ship their probe +// results to it and it decides the cluster-wide truth. +type Aggregator struct { + cluster *config.ClusterConfig + transition TransitionFn + + mu sync.Mutex + perCheck map[string]*checkState +} + +type checkState struct { + // nodeID → most recent result we've received from that node. + latest map[string]nodeResult + + committed State // last announced state (held through hysteresis) + candidate State // state being considered for promotion + consecutive int // ticks the candidate has persisted +} + +type nodeResult struct { + OK bool + Detail string + At time.Time +} + +// NewAggregator returns an empty aggregator. fn may be nil during +// startup; SetTransition can wire it later. +func NewAggregator(cluster *config.ClusterConfig, fn TransitionFn) *Aggregator { + return &Aggregator{ + cluster: cluster, + transition: fn, + perCheck: map[string]*checkState{}, + } +} + +// SetTransition wires (or rewires) the transition callback. +func (a *Aggregator) SetTransition(fn TransitionFn) { + a.mu.Lock() + a.transition = fn + a.mu.Unlock() +} + +// Submit records one node's result for one check and immediately +// re-evaluates that check's aggregate state. +func (a *Aggregator) Submit(nodeID string, r Result) { + if r.CheckID == "" { + return + } + a.mu.Lock() + st, ok := a.perCheck[r.CheckID] + if !ok { + st = &checkState{ + latest: map[string]nodeResult{}, + committed: StateUnknown, + candidate: StateUnknown, + } + a.perCheck[r.CheckID] = st + } + st.latest[nodeID] = nodeResult{OK: r.OK, Detail: r.Detail, At: r.Timestamp} + a.mu.Unlock() + a.evaluate(r.CheckID) +} + +// SnapshotAll returns the current aggregate view of every known check. +func (a *Aggregator) SnapshotAll() map[string]Snapshot { + a.mu.Lock() + defer a.mu.Unlock() + out := make(map[string]Snapshot, len(a.perCheck)) + for id, st := range a.perCheck { + out[id] = a.snapshotLocked(id, st) + } + return out +} + +// SnapshotFor returns the aggregate for a single check. +func (a *Aggregator) SnapshotFor(checkID string) (Snapshot, bool) { + a.mu.Lock() + defer a.mu.Unlock() + st, ok := a.perCheck[checkID] + if !ok { + return Snapshot{}, false + } + return a.snapshotLocked(checkID, st), true +} + +func (a *Aggregator) evaluate(checkID string) { + check := a.lookupCheck(checkID) + if check == nil { + // Check was removed from cluster.yaml — drop its state so we + // don't keep alerting on something the operator deleted. + a.mu.Lock() + delete(a.perCheck, checkID) + a.mu.Unlock() + return + } + + a.mu.Lock() + st, ok := a.perCheck[checkID] + if !ok { + a.mu.Unlock() + return + } + + freshWindow := freshWindowFor(check.Interval) + cutoff := time.Now().Add(-freshWindow) + + var ok_, notOK int + var lastDetail string + for _, nr := range st.latest { + if nr.At.Before(cutoff) { + continue + } + if nr.OK { + ok_++ + } else { + notOK++ + if lastDetail == "" { + lastDetail = nr.Detail + } + } + } + + var candidate State + switch { + case ok_+notOK == 0: + candidate = StateUnknown + case notOK > ok_: + candidate = StateDown + default: + candidate = StateUp + } + + if candidate == st.candidate { + st.consecutive++ + } else { + st.candidate = candidate + st.consecutive = 1 + } + + var fireFrom, fireTo State + var fired bool + if candidate != st.committed && st.consecutive >= HysteresisCount { + fireFrom = st.committed + fireTo = candidate + st.committed = candidate + fired = true + } + + snap := a.snapshotLocked(checkID, st) + fn := a.transition + a.mu.Unlock() + + if fired && fn != nil { + fn(check, fireFrom, fireTo, snap) + } +} + +func (a *Aggregator) snapshotLocked(checkID string, st *checkState) Snapshot { + check := a.lookupCheck(checkID) + freshWindow := 60 * time.Second + if check != nil { + freshWindow = freshWindowFor(check.Interval) + } + cutoff := time.Now().Add(-freshWindow) + + var ok_, notOK int + var detail string + for _, nr := range st.latest { + if nr.At.Before(cutoff) { + continue + } + if nr.OK { + ok_++ + } else { + notOK++ + if detail == "" { + detail = nr.Detail + } + } + } + return Snapshot{ + CheckID: checkID, + State: st.committed, + Reports: ok_ + notOK, + OKCount: ok_, + NotOK: notOK, + Detail: detail, + UpdateAt: time.Now().UTC(), + } +} + +func (a *Aggregator) lookupCheck(id string) *config.Check { + snap := a.cluster.Snapshot() + for i := range snap.Checks { + if snap.Checks[i].ID == id { + c := snap.Checks[i] + return &c + } + } + return nil +} + +// freshWindowFor returns the staleness threshold for a check given +// its configured interval. Anything older than this is considered too +// stale to count. +func freshWindowFor(interval time.Duration) time.Duration { + if interval <= 0 { + return 60 * time.Second + } + w := interval * 3 + if w < 30*time.Second { + w = 30 * time.Second + } + return w +} diff --git a/internal/checks/http.go b/internal/checks/http.go new file mode 100644 index 0000000..c31de2c --- /dev/null +++ b/internal/checks/http.go @@ -0,0 +1,62 @@ +package checks + +import ( + "context" + "crypto/tls" + "io" + "net/http" + "strings" + "time" + + "github.com/jasper/quptime/internal/config" +) + +// maxBodyRead is the cap on how much body a check will pull when +// BodyMatch is non-empty. Anything beyond is ignored. +const maxBodyRead = 1 << 20 // 1 MiB + +type httpProber struct{} + +func (httpProber) Probe(ctx context.Context, c *config.Check) Result { + client := &http.Client{ + Timeout: c.Timeout, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{MinVersion: tls.VersionTLS12}, + }, + } + start := time.Now() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.Target, nil) + if err != nil { + return Result{OK: false, Detail: "build request: " + err.Error()} + } + req.Header.Set("User-Agent", "quptime/1.0") + resp, err := client.Do(req) + if err != nil { + return Result{OK: false, Detail: err.Error(), Latency: time.Since(start)} + } + defer resp.Body.Close() + + expected := c.ExpectStatus + if expected == 0 { + expected = 200 + } + if resp.StatusCode != expected { + return Result{ + OK: false, + Detail: "status " + resp.Status, + Latency: time.Since(start), + } + } + + if c.BodyMatch != "" { + body, err := io.ReadAll(io.LimitReader(resp.Body, maxBodyRead)) + if err != nil { + return Result{OK: false, Detail: "read body: " + err.Error(), Latency: time.Since(start)} + } + if !strings.Contains(string(body), c.BodyMatch) { + return Result{OK: false, Detail: "body match miss", Latency: time.Since(start)} + } + } + + return Result{OK: true, Latency: time.Since(start)} +} diff --git a/internal/checks/icmp.go b/internal/checks/icmp.go new file mode 100644 index 0000000..34ccbda --- /dev/null +++ b/internal/checks/icmp.go @@ -0,0 +1,48 @@ +package checks + +import ( + "context" + "runtime" + "time" + + probing "github.com/prometheus-community/pro-bing" + + "github.com/jasper/quptime/internal/config" +) + +type icmpProber struct{} + +// Probe sends a single ICMP echo. On Linux we default to unprivileged +// "udp" mode so the daemon does not require CAP_NET_RAW. Operators +// who can grant the cap (or run as root) get raw ICMP automatically. +func (icmpProber) Probe(ctx context.Context, c *config.Check) Result { + start := time.Now() + pinger, err := probing.NewPinger(c.Target) + if err != nil { + return Result{OK: false, Detail: "resolve: " + err.Error()} + } + pinger.Count = 1 + pinger.Timeout = c.Timeout + if pinger.Timeout <= 0 { + pinger.Timeout = 5 * time.Second + } + pinger.SetPrivileged(runtime.GOOS != "linux") + + doneCh := make(chan struct{}) + go func() { + _ = pinger.RunWithContext(ctx) + close(doneCh) + }() + select { + case <-doneCh: + case <-ctx.Done(): + pinger.Stop() + return Result{OK: false, Detail: "timeout", Latency: time.Since(start)} + } + + stats := pinger.Statistics() + if stats.PacketsRecv == 0 { + return Result{OK: false, Detail: "no reply", Latency: time.Since(start)} + } + return Result{OK: true, Latency: stats.AvgRtt} +} diff --git a/internal/checks/probe.go b/internal/checks/probe.go new file mode 100644 index 0000000..320f1e5 --- /dev/null +++ b/internal/checks/probe.go @@ -0,0 +1,65 @@ +// Package checks runs the configured HTTP/TCP/ICMP probes on every +// node and aggregates per-check results on the master. +// +// Probers are the small "do one round of work" units. The Scheduler +// drives them on a per-check timer and ships each result back to the +// master via the inter-node transport. The Aggregator (master only) +// folds incoming results into per-check sliding windows and decides +// when a check has crossed UP↔DOWN. +package checks + +import ( + "context" + "fmt" + "time" + + "github.com/jasper/quptime/internal/config" +) + +// Result is the outcome of a single probe. +type Result struct { + CheckID string + OK bool + Detail string + Latency time.Duration + Timestamp time.Time +} + +// Prober runs one probe of a configured check. +type Prober interface { + Probe(ctx context.Context, c *config.Check) Result +} + +// Run dispatches to the right Prober for the given check type. Returns +// an error result instead of failing when a check has unknown type. +func Run(ctx context.Context, c *config.Check) Result { + deadline := c.Timeout + if deadline <= 0 { + deadline = 10 * time.Second + } + pctx, cancel := context.WithTimeout(ctx, deadline) + defer cancel() + + var p Prober + switch c.Type { + case config.CheckHTTP: + p = httpProber{} + case config.CheckTCP: + p = tcpProber{} + case config.CheckICMP: + p = icmpProber{} + default: + return Result{ + CheckID: c.ID, + OK: false, + Detail: fmt.Sprintf("unknown check type %q", c.Type), + Timestamp: time.Now().UTC(), + } + } + res := p.Probe(pctx, c) + res.CheckID = c.ID + if res.Timestamp.IsZero() { + res.Timestamp = time.Now().UTC() + } + return res +} diff --git a/internal/checks/scheduler.go b/internal/checks/scheduler.go new file mode 100644 index 0000000..4f5a849 --- /dev/null +++ b/internal/checks/scheduler.go @@ -0,0 +1,147 @@ +package checks + +import ( + "context" + "sync" + "time" + + "github.com/jasper/quptime/internal/config" +) + +// ReconcileInterval is how often the scheduler reconciles its set of +// running probes against cluster.yaml. +const ReconcileInterval = 5 * time.Second + +// Sink is the abstraction the scheduler uses to report results. +// Implemented by the daemon: results go straight to the local +// aggregator when self is the master, otherwise they ship to the +// master over the RPC channel. +type Sink interface { + Submit(Result) +} + +// Scheduler keeps a goroutine alive per configured check. On each +// reconcile pass it starts probes for new checks, stops probes for +// removed checks, and restarts probes whose interval or type changed. +type Scheduler struct { + cluster *config.ClusterConfig + sink Sink + + mu sync.Mutex + running map[string]*probeWorker +} + +type probeWorker struct { + check config.Check + cancel context.CancelFunc +} + +// NewScheduler creates a scheduler bound to the given cluster config. +func NewScheduler(cluster *config.ClusterConfig, sink Sink) *Scheduler { + return &Scheduler{ + cluster: cluster, + sink: sink, + running: map[string]*probeWorker{}, + } +} + +// Start runs the reconcile loop until ctx is cancelled. Reconcile is +// also called immediately on entry so checks start without waiting +// for the first tick. +func (s *Scheduler) Start(ctx context.Context) { + s.reconcile(ctx) + t := time.NewTicker(ReconcileInterval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + s.stopAll() + return + case <-t.C: + s.reconcile(ctx) + } + } +} + +func (s *Scheduler) reconcile(ctx context.Context) { + snap := s.cluster.Snapshot() + want := map[string]config.Check{} + for _, c := range snap.Checks { + if c.ID == "" { + continue + } + want[c.ID] = c + } + + s.mu.Lock() + defer s.mu.Unlock() + + for id, w := range s.running { + desired, stillThere := want[id] + if !stillThere || !sameCheck(desired, w.check) { + w.cancel() + delete(s.running, id) + } + } + for id, c := range want { + if _, exists := s.running[id]; exists { + continue + } + wctx, cancel := context.WithCancel(ctx) + s.running[id] = &probeWorker{check: c, cancel: cancel} + go s.run(wctx, c) + } +} + +func (s *Scheduler) run(ctx context.Context, c config.Check) { + interval := c.Interval + if interval <= 0 { + interval = 30 * time.Second + } + // stagger startup so a freshly-loaded scheduler doesn't burst + // hundreds of probes simultaneously. + jitter := time.Duration(int64(interval) / 10) + if jitter > 0 { + select { + case <-ctx.Done(): + return + case <-time.After(time.Duration(time.Now().UnixNano() % int64(jitter))): + } + } + t := time.NewTicker(interval) + defer t.Stop() + + // fire one immediate probe so state populates without delay. + s.sink.Submit(Run(ctx, &c)) + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + s.sink.Submit(Run(ctx, &c)) + } + } +} + +func (s *Scheduler) stopAll() { + s.mu.Lock() + for id, w := range s.running { + w.cancel() + delete(s.running, id) + } + s.mu.Unlock() +} + +// sameCheck returns true when two Check structs would produce +// identical probing behaviour, so the scheduler can leave the worker +// running across a no-op config push. +func sameCheck(a, b config.Check) bool { + return a.ID == b.ID && + a.Type == b.Type && + a.Target == b.Target && + a.Interval == b.Interval && + a.Timeout == b.Timeout && + a.ExpectStatus == b.ExpectStatus && + a.BodyMatch == b.BodyMatch +} diff --git a/internal/checks/tcp.go b/internal/checks/tcp.go new file mode 100644 index 0000000..8a73094 --- /dev/null +++ b/internal/checks/tcp.go @@ -0,0 +1,22 @@ +package checks + +import ( + "context" + "net" + "time" + + "github.com/jasper/quptime/internal/config" +) + +type tcpProber struct{} + +func (tcpProber) Probe(ctx context.Context, c *config.Check) Result { + start := time.Now() + d := net.Dialer{Timeout: c.Timeout} + conn, err := d.DialContext(ctx, "tcp", c.Target) + if err != nil { + return Result{OK: false, Detail: err.Error(), Latency: time.Since(start)} + } + _ = conn.Close() + return Result{OK: true, Latency: time.Since(start)} +} diff --git a/internal/cli/alert.go b/internal/cli/alert.go new file mode 100644 index 0000000..fc5d5ae --- /dev/null +++ b/internal/cli/alert.go @@ -0,0 +1,182 @@ +package cli + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/spf13/cobra" + + "github.com/jasper/quptime/internal/config" + "github.com/jasper/quptime/internal/daemon" + "github.com/jasper/quptime/internal/transport" +) + +func addAlertCmd(root *cobra.Command) { + alert := &cobra.Command{ + Use: "alert", + Short: "Manage notification channels", + } + + addParent := &cobra.Command{ + Use: "add", + Short: "Add a new alert channel", + } + addParent.AddCommand(buildSMTPAddCmd(), buildDiscordAddCmd()) + + listCmd := &cobra.Command{ + Use: "list", + Short: "List configured alerts", + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Second) + defer cancel() + raw, err := callDaemon(ctx, daemon.CtrlStatus, nil) + if err != nil { + return err + } + var st transport.StatusResponse + if err := json.Unmarshal(raw, &st); err != nil { + return err + } + // status response doesn't carry alerts — call mutate with a + // "list" by reading cluster.yaml indirectly via status's + // version is not enough. Fall back: ask for ClusterConfig + // via a dedicated read RPC if needed. For v1 we rely on + // node.yaml being co-located: read the local cluster.yaml + // directly so the operator gets up-to-date output. + cluster, err := config.LoadClusterConfig() + if err != nil { + return err + } + for _, a := range cluster.Alerts { + fmt.Fprintf(cmd.OutOrStdout(), "%s\t%s\t%s\n", a.ID, a.Type, a.Name) + } + return nil + }, + } + + removeCmd := &cobra.Command{ + Use: "remove ", + Short: "Remove an alert channel", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Second) + defer cancel() + payload, _ := json.Marshal(args[0]) + body := daemon.MutateBody{Kind: transport.MutationRemoveAlert, Payload: payload} + raw, err := callDaemon(ctx, daemon.CtrlMutate, body) + if err != nil { + return err + } + var res daemon.MutateResult + _ = json.Unmarshal(raw, &res) + fmt.Fprintf(cmd.OutOrStdout(), "removed alert %s (cluster version now %d)\n", args[0], res.Version) + return nil + }, + } + + testCmd := &cobra.Command{ + Use: "test ", + Short: "Send a test notification through an alert channel", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second) + defer cancel() + body := daemon.AlertTestBody{AlertID: args[0]} + if _, err := callDaemon(ctx, daemon.CtrlAlertTest, body); err != nil { + return err + } + fmt.Fprintf(cmd.OutOrStdout(), "test alert sent via %s\n", args[0]) + return nil + }, + } + + alert.AddCommand(addParent, listCmd, removeCmd, testCmd) + root.AddCommand(alert) +} + +func buildSMTPAddCmd() *cobra.Command { + var host, user, password, from string + var port int + var to []string + var startTLS bool + + cmd := &cobra.Command{ + Use: "smtp ", + Short: "Add an SMTP relay alert", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Second) + defer cancel() + a := config.Alert{ + ID: uuid.NewString(), + Name: args[0], + Type: config.AlertSMTP, + SMTPHost: host, + SMTPPort: port, + SMTPUser: user, + SMTPPassword: password, + SMTPFrom: from, + SMTPTo: to, + SMTPStartTLS: startTLS, + } + payload, _ := json.Marshal(a) + body := daemon.MutateBody{Kind: transport.MutationAddAlert, Payload: payload} + raw, err := callDaemon(ctx, daemon.CtrlMutate, body) + if err != nil { + return err + } + var res daemon.MutateResult + _ = json.Unmarshal(raw, &res) + fmt.Fprintf(cmd.OutOrStdout(), "added smtp alert %s id=%s — cluster version %d\n", + a.Name, a.ID, res.Version) + return nil + }, + } + cmd.Flags().StringVar(&host, "host", "", "smtp server host") + cmd.Flags().IntVar(&port, "port", 587, "smtp server port") + cmd.Flags().StringVar(&user, "user", "", "smtp auth user (empty for anonymous)") + cmd.Flags().StringVar(&password, "password", "", "smtp auth password") + cmd.Flags().StringVar(&from, "from", "", "envelope From address") + cmd.Flags().StringSliceVar(&to, "to", nil, "recipient address (repeat or comma-separate)") + cmd.Flags().BoolVar(&startTLS, "starttls", true, "negotiate STARTTLS") + _ = cmd.MarkFlagRequired("host") + _ = cmd.MarkFlagRequired("from") + _ = cmd.MarkFlagRequired("to") + return cmd +} + +func buildDiscordAddCmd() *cobra.Command { + var webhook string + cmd := &cobra.Command{ + Use: "discord ", + Short: "Add a Discord webhook alert", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Second) + defer cancel() + a := config.Alert{ + ID: uuid.NewString(), + Name: args[0], + Type: config.AlertDiscord, + DiscordWebhook: webhook, + } + payload, _ := json.Marshal(a) + body := daemon.MutateBody{Kind: transport.MutationAddAlert, Payload: payload} + raw, err := callDaemon(ctx, daemon.CtrlMutate, body) + if err != nil { + return err + } + var res daemon.MutateResult + _ = json.Unmarshal(raw, &res) + fmt.Fprintf(cmd.OutOrStdout(), "added discord alert %s id=%s — cluster version %d\n", + a.Name, a.ID, res.Version) + return nil + }, + } + cmd.Flags().StringVar(&webhook, "webhook", "", "discord webhook URL") + _ = cmd.MarkFlagRequired("webhook") + return cmd +} diff --git a/internal/cli/check.go b/internal/cli/check.go new file mode 100644 index 0000000..3e7bdde --- /dev/null +++ b/internal/cli/check.go @@ -0,0 +1,174 @@ +package cli + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/google/uuid" + "github.com/spf13/cobra" + + "github.com/jasper/quptime/internal/config" + "github.com/jasper/quptime/internal/daemon" + "github.com/jasper/quptime/internal/transport" +) + +func addCheckCmd(root *cobra.Command) { + check := &cobra.Command{ + Use: "check", + Short: "Manage configured checks", + } + + addHTTP := buildAddCheckCmd(config.CheckHTTP, "http", " ", + "Add an HTTP/HTTPS check", + func(args []string, c *config.Check) error { + c.Name = args[0] + c.Target = args[1] + return nil + }) + addHTTP.Flags().Int("expect", 200, "HTTP status code that signals UP") + addHTTP.Flags().String("body-match", "", "substring required in response body for UP") + bindHTTPFlags(addHTTP) + + addTCP := buildAddCheckCmd(config.CheckTCP, "tcp", " ", + "Add a TCP-connect check", + func(args []string, c *config.Check) error { + c.Name = args[0] + c.Target = args[1] + return nil + }) + + addICMP := buildAddCheckCmd(config.CheckICMP, "icmp", " ", + "Add an ICMP ping check", + func(args []string, c *config.Check) error { + c.Name = args[0] + c.Target = args[1] + return nil + }) + + addParent := &cobra.Command{ + Use: "add", + Short: "Add a new check", + } + addParent.AddCommand(addHTTP, addTCP, addICMP) + + listCmd := &cobra.Command{ + Use: "list", + Short: "List configured checks and their current aggregate state", + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Second) + defer cancel() + return runStatusPrintChecks(ctx, cmd) + }, + } + + removeCmd := &cobra.Command{ + Use: "remove ", + Short: "Remove a configured check", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Second) + defer cancel() + body := daemon.MutateBody{Kind: transport.MutationRemoveCheck} + payload, _ := json.Marshal(args[0]) + body.Payload = payload + raw, err := callDaemon(ctx, daemon.CtrlMutate, body) + if err != nil { + return err + } + var res daemon.MutateResult + _ = json.Unmarshal(raw, &res) + fmt.Fprintf(cmd.OutOrStdout(), "removed check %s (cluster version now %d)\n", args[0], res.Version) + return nil + }, + } + + check.AddCommand(addParent, listCmd, removeCmd) + root.AddCommand(check) +} + +// buildAddCheckCmd produces the per-type "qu check add " subcommand. +func buildAddCheckCmd(ctype config.CheckType, use, argSpec, short string, + bind func(args []string, c *config.Check) error, +) *cobra.Command { + cmd := &cobra.Command{ + Use: use + " " + argSpec, + Short: short, + Args: cobra.ExactArgs(2), + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Second) + defer cancel() + ch := config.Check{ + ID: uuid.NewString(), + Type: ctype, + } + if err := bind(args, &ch); err != nil { + return err + } + intervalStr, _ := cmd.Flags().GetString("interval") + timeoutStr, _ := cmd.Flags().GetString("timeout") + alertsCSV, _ := cmd.Flags().GetString("alerts") + if intervalStr != "" { + d, err := time.ParseDuration(intervalStr) + if err != nil { + return fmt.Errorf("--interval: %w", err) + } + ch.Interval = d + } else { + ch.Interval = 30 * time.Second + } + if timeoutStr != "" { + d, err := time.ParseDuration(timeoutStr) + if err != nil { + return fmt.Errorf("--timeout: %w", err) + } + ch.Timeout = d + } else { + ch.Timeout = 10 * time.Second + } + if alertsCSV != "" { + for _, p := range strings.Split(alertsCSV, ",") { + p = strings.TrimSpace(p) + if p != "" { + ch.AlertIDs = append(ch.AlertIDs, p) + } + } + } + if ctype == config.CheckHTTP { + es, _ := cmd.Flags().GetInt("expect") + bm, _ := cmd.Flags().GetString("body-match") + ch.ExpectStatus = es + ch.BodyMatch = bm + } + + payload, err := json.Marshal(ch) + if err != nil { + return err + } + body := daemon.MutateBody{Kind: transport.MutationAddCheck, Payload: payload} + raw, err := callDaemon(ctx, daemon.CtrlMutate, body) + if err != nil { + return err + } + var res daemon.MutateResult + _ = json.Unmarshal(raw, &res) + fmt.Fprintf(cmd.OutOrStdout(), "added check %s (%s) id=%s — cluster version %d\n", + ch.Name, ch.Type, ch.ID, res.Version) + return nil + }, + } + bindCheckFlags(cmd) + return cmd +} + +func bindCheckFlags(cmd *cobra.Command) { + cmd.Flags().String("interval", "30s", "probe interval") + cmd.Flags().String("timeout", "10s", "per-probe timeout") + cmd.Flags().String("alerts", "", "comma-separated alert IDs/names to notify on transition") +} + +// bindHTTPFlags is a no-op kept to mirror the per-type flag bind sites +// so the caller can extend cleanly later. +func bindHTTPFlags(cmd *cobra.Command) {} diff --git a/internal/cli/cli.go b/internal/cli/cli.go new file mode 100644 index 0000000..2b9ecf9 --- /dev/null +++ b/internal/cli/cli.go @@ -0,0 +1,27 @@ +// Package cli wires every user-facing command on the qu binary. +// +// The root command is built lazily via NewRootCommand so test code +// can construct a fresh tree per invocation. Each subcommand lives +// in its own file (init.go, serve.go, node.go, …) and is attached +// from NewRootCommand below. +package cli + +import "github.com/spf13/cobra" + +// NewRootCommand returns the full cobra tree. +func NewRootCommand() *cobra.Command { + root := &cobra.Command{ + Use: "qu", + Short: "Quorum-based uptime monitor", + SilenceUsage: true, + SilenceErrors: true, + } + addInitCmd(root) + addServeCmd(root) + addNodeCmd(root) + addCheckCmd(root) + addAlertCmd(root) + addTrustCmd(root) + addStatusCmd(root) + return root +} diff --git a/internal/cli/client.go b/internal/cli/client.go new file mode 100644 index 0000000..bfe9004 --- /dev/null +++ b/internal/cli/client.go @@ -0,0 +1,95 @@ +package cli + +import ( + "context" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "time" + + "github.com/jasper/quptime/internal/config" + "github.com/jasper/quptime/internal/daemon" +) + +// callDaemon sends one control-plane request and decodes the +// response. Returns the raw body the daemon produced, ready for the +// caller to unmarshal into the per-command result struct. +func callDaemon(ctx context.Context, method string, body any) (json.RawMessage, error) { + var rawBody json.RawMessage + if body != nil { + b, err := json.Marshal(body) + if err != nil { + return nil, err + } + rawBody = b + } + req := daemon.CtrlRequest{Method: method, Body: rawBody} + reqBytes, err := json.Marshal(req) + if err != nil { + return nil, err + } + + conn, err := dialControl(ctx) + if err != nil { + return nil, err + } + defer conn.Close() + + if dl, ok := ctx.Deadline(); ok { + _ = conn.SetDeadline(dl) + } else { + _ = conn.SetDeadline(time.Now().Add(30 * time.Second)) + } + + if err := writeFrame(conn, reqBytes); err != nil { + return nil, err + } + respBytes, err := readFrame(conn) + if err != nil { + return nil, err + } + var resp daemon.CtrlResponse + if err := json.Unmarshal(respBytes, &resp); err != nil { + return nil, err + } + if resp.Error != "" { + return nil, errors.New(resp.Error) + } + return resp.Body, nil +} + +func dialControl(ctx context.Context) (net.Conn, error) { + sock := config.SocketPath() + d := net.Dialer{} + conn, err := d.DialContext(ctx, "unix", sock) + if err != nil { + return nil, fmt.Errorf("dial daemon socket %s: %w", sock, err) + } + return conn, nil +} + +func writeFrame(w io.Writer, body []byte) error { + var hdr [4]byte + binary.BigEndian.PutUint32(hdr[:], uint32(len(body))) + if _, err := w.Write(hdr[:]); err != nil { + return err + } + _, err := w.Write(body) + return err +} + +func readFrame(r io.Reader) ([]byte, error) { + var hdr [4]byte + if _, err := io.ReadFull(r, hdr[:]); err != nil { + return nil, err + } + n := binary.BigEndian.Uint32(hdr[:]) + buf := make([]byte, n) + if _, err := io.ReadFull(r, buf); err != nil { + return nil, err + } + return buf, nil +} diff --git a/internal/cli/init.go b/internal/cli/init.go new file mode 100644 index 0000000..ef2440a --- /dev/null +++ b/internal/cli/init.go @@ -0,0 +1,58 @@ +package cli + +import ( + "errors" + "fmt" + "os" + + "github.com/google/uuid" + "github.com/spf13/cobra" + + "github.com/jasper/quptime/internal/config" + "github.com/jasper/quptime/internal/crypto" +) + +func addInitCmd(root *cobra.Command) { + var advertise string + var bindAddr string + var bindPort int + + cmd := &cobra.Command{ + Use: "init", + Short: "Generate node identity, keys, and config", + Long: `Initialise a new qu node on this host: pick a UUID, generate an +RSA keypair, write a default node.yaml, and prepare the trust store. + +Idempotent in one direction only: existing key material is never +overwritten. Re-run only after wiping the data directory.`, + RunE: func(cmd *cobra.Command, args []string) error { + if err := config.EnsureDataDir(); err != nil { + return err + } + if _, err := os.Stat(config.NodeFilePath()); err == nil { + return errors.New("node.yaml already exists in data dir — refusing to overwrite") + } + nodeID := uuid.NewString() + n := &config.NodeConfig{ + NodeID: nodeID, + BindAddr: bindAddr, + BindPort: bindPort, + Advertise: advertise, + } + if err := n.Save(); err != nil { + return fmt.Errorf("save node.yaml: %w", err) + } + if _, err := crypto.GenerateKeyPair(nodeID); err != nil { + return fmt.Errorf("generate keys: %w", err) + } + fmt.Fprintf(cmd.OutOrStdout(), "initialised node %s\n", nodeID) + fmt.Fprintf(cmd.OutOrStdout(), "data dir: %s\n", config.DataDir()) + fmt.Fprintf(cmd.OutOrStdout(), "advertise: %s\n", n.AdvertiseAddr()) + return nil + }, + } + cmd.Flags().StringVar(&advertise, "advertise", "", "address peers should use to reach this node (host:port)") + cmd.Flags().StringVar(&bindAddr, "bind", "0.0.0.0", "listen address for inter-node traffic") + cmd.Flags().IntVar(&bindPort, "port", 9001, "listen port for inter-node traffic") + root.AddCommand(cmd) +} diff --git a/internal/cli/node.go b/internal/cli/node.go new file mode 100644 index 0000000..b281331 --- /dev/null +++ b/internal/cli/node.go @@ -0,0 +1,108 @@ +package cli + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "os" + "strings" + "time" + + "github.com/spf13/cobra" + + "github.com/jasper/quptime/internal/daemon" +) + +func addNodeCmd(root *cobra.Command) { + node := &cobra.Command{ + Use: "node", + Short: "Manage cluster membership", + } + + add := &cobra.Command{ + Use: "add ", + Short: "Trust-on-first-use add a peer to this cluster", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second) + defer cancel() + return runNodeAdd(ctx, cmd, args[0]) + }, + } + add.Flags().BoolP("yes", "y", false, "skip interactive confirmation") + node.AddCommand(add) + + list := &cobra.Command{ + Use: "list", + Short: "List configured peers and their last-seen status", + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Second) + defer cancel() + return runStatusPrint(ctx, cmd, true) + }, + } + node.AddCommand(list) + + remove := &cobra.Command{ + Use: "remove ", + Short: "Remove a peer from the cluster and trust store", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Second) + defer cancel() + body := daemon.NodeRemoveBody{NodeID: args[0]} + raw, err := callDaemon(ctx, daemon.CtrlNodeRemove, body) + if err != nil { + return err + } + var res daemon.MutateResult + if len(raw) > 0 { + _ = json.Unmarshal(raw, &res) + } + fmt.Fprintf(cmd.OutOrStdout(), "removed %s (cluster version now %d)\n", args[0], res.Version) + return nil + }, + } + node.AddCommand(remove) + + root.AddCommand(node) +} + +// runNodeAdd does a two-step TOFU: probe peer, confirm fingerprint +// interactively, then issue the actual add. +func runNodeAdd(ctx context.Context, cmd *cobra.Command, addr string) error { + probeBody := daemon.NodeProbeBody{Address: addr} + raw, err := callDaemon(ctx, daemon.CtrlNodeProbe, probeBody) + if err != nil { + return fmt.Errorf("probe %s: %w", addr, err) + } + var probe daemon.NodeProbeResult + if err := json.Unmarshal(raw, &probe); err != nil { + return err + } + fmt.Fprintf(cmd.OutOrStdout(), "remote node id : %s\n", probe.NodeID) + fmt.Fprintf(cmd.OutOrStdout(), "fingerprint : %s\n", probe.Fingerprint) + + yes, _ := cmd.Flags().GetBool("yes") + if !yes { + fmt.Fprint(cmd.OutOrStdout(), "trust this peer? [y/N] ") + ans, _ := bufio.NewReader(os.Stdin).ReadString('\n') + ans = strings.ToLower(strings.TrimSpace(ans)) + if ans != "y" && ans != "yes" { + return fmt.Errorf("aborted") + } + } + + addBody := daemon.NodeAddBody{Address: addr, Fingerprint: probe.Fingerprint} + raw, err = callDaemon(ctx, daemon.CtrlNodeAdd, addBody) + if err != nil { + return err + } + var res daemon.NodeAddResult + if err := json.Unmarshal(raw, &res); err != nil { + return err + } + fmt.Fprintf(cmd.OutOrStdout(), "added node %s (cluster version now %d)\n", res.NodeID, res.Version) + return nil +} diff --git a/internal/cli/serve.go b/internal/cli/serve.go new file mode 100644 index 0000000..594cbd9 --- /dev/null +++ b/internal/cli/serve.go @@ -0,0 +1,34 @@ +package cli + +import ( + "context" + "log" + "os" + "os/signal" + "syscall" + + "github.com/spf13/cobra" + + "github.com/jasper/quptime/internal/daemon" +) + +func addServeCmd(root *cobra.Command) { + cmd := &cobra.Command{ + Use: "serve", + Short: "Run the qu daemon in the foreground", + Long: `Run the qu daemon: starts the inter-node listener, the local +control socket for the CLI, the heartbeat loop and the check +scheduler. Stops cleanly on SIGINT or SIGTERM.`, + RunE: func(cmd *cobra.Command, args []string) error { + logger := log.New(os.Stderr, "quptime: ", log.LstdFlags|log.Lmsgprefix) + d, err := daemon.New(logger) + if err != nil { + return err + } + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() + return d.Run(ctx) + }, + } + root.AddCommand(cmd) +} diff --git a/internal/cli/status.go b/internal/cli/status.go new file mode 100644 index 0000000..46c2b9e --- /dev/null +++ b/internal/cli/status.go @@ -0,0 +1,101 @@ +package cli + +import ( + "context" + "encoding/json" + "fmt" + "text/tabwriter" + "time" + + "github.com/spf13/cobra" + + "github.com/jasper/quptime/internal/daemon" + "github.com/jasper/quptime/internal/transport" +) + +func addStatusCmd(root *cobra.Command) { + cmd := &cobra.Command{ + Use: "status", + Short: "Print quorum, master, and check state", + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Second) + defer cancel() + return runStatusPrint(ctx, cmd, false) + }, + } + root.AddCommand(cmd) +} + +// runStatusPrint fetches /status from the daemon and prints either +// the peer view or the full view depending on peersOnly. +func runStatusPrint(ctx context.Context, cmd *cobra.Command, peersOnly bool) error { + raw, err := callDaemon(ctx, daemon.CtrlStatus, nil) + if err != nil { + return err + } + var st transport.StatusResponse + if err := json.Unmarshal(raw, &st); err != nil { + return err + } + out := cmd.OutOrStdout() + fmt.Fprintf(out, "node %s\n", st.NodeID) + fmt.Fprintf(out, "term %d\n", st.Term) + fmt.Fprintf(out, "master %s\n", masterOrNone(st.MasterID)) + fmt.Fprintf(out, "quorum %v (need %d)\n", st.HasQuorum, st.QuorumSize) + fmt.Fprintf(out, "config ver %d\n", st.Version) + + fmt.Fprintln(out) + fmt.Fprintln(out, "PEERS") + tw := tabwriter.NewWriter(out, 0, 0, 2, ' ', 0) + fmt.Fprintln(tw, "NODE_ID\tADVERTISE\tLIVE\tLAST_SEEN") + for _, p := range st.Peers { + lastSeen := "-" + if !p.LastSeen.IsZero() { + lastSeen = p.LastSeen.Format(time.RFC3339) + } + fmt.Fprintf(tw, "%s\t%s\t%v\t%s\n", p.NodeID, p.Advertise, p.Live, lastSeen) + } + tw.Flush() + + if peersOnly { + return nil + } + + fmt.Fprintln(out) + fmt.Fprintln(out, "CHECKS") + tw2 := tabwriter.NewWriter(out, 0, 0, 2, ' ', 0) + fmt.Fprintln(tw2, "ID\tNAME\tSTATE\tOK/TOTAL\tDETAIL") + for _, c := range st.Checks { + fmt.Fprintf(tw2, "%s\t%s\t%s\t%d/%d\t%s\n", + c.CheckID, c.Name, c.State, c.OKCount, c.Total, c.Detail) + } + return tw2.Flush() +} + +// runStatusPrintChecks renders only the checks block (used by +// `qu check list`). +func runStatusPrintChecks(ctx context.Context, cmd *cobra.Command) error { + raw, err := callDaemon(ctx, daemon.CtrlStatus, nil) + if err != nil { + return err + } + var st transport.StatusResponse + if err := json.Unmarshal(raw, &st); err != nil { + return err + } + out := cmd.OutOrStdout() + tw := tabwriter.NewWriter(out, 0, 0, 2, ' ', 0) + fmt.Fprintln(tw, "ID\tNAME\tSTATE\tOK/TOTAL\tDETAIL") + for _, c := range st.Checks { + fmt.Fprintf(tw, "%s\t%s\t%s\t%d/%d\t%s\n", + c.CheckID, c.Name, c.State, c.OKCount, c.Total, c.Detail) + } + return tw.Flush() +} + +func masterOrNone(id string) string { + if id == "" { + return "(none — no quorum or election in progress)" + } + return id +} diff --git a/internal/cli/trust.go b/internal/cli/trust.go new file mode 100644 index 0000000..138f59f --- /dev/null +++ b/internal/cli/trust.go @@ -0,0 +1,74 @@ +package cli + +import ( + "context" + "encoding/json" + "fmt" + "text/tabwriter" + "time" + + "github.com/spf13/cobra" + + "github.com/jasper/quptime/internal/daemon" + "github.com/jasper/quptime/internal/trust" +) + +func addTrustCmd(root *cobra.Command) { + t := &cobra.Command{ + Use: "trust", + Short: "Inspect and edit the local trust store", + } + + list := &cobra.Command{ + Use: "list", + Short: "Print all trusted peer fingerprints", + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Second) + defer cancel() + raw, err := callDaemon(ctx, daemon.CtrlTrustList, nil) + if err != nil { + return err + } + var entries []trust.Entry + if err := json.Unmarshal(raw, &entries); err != nil { + return err + } + tw := tabwriter.NewWriter(cmd.OutOrStdout(), 0, 0, 2, ' ', 0) + fmt.Fprintln(tw, "NODE_ID\tADDRESS\tFINGERPRINT\tADDED") + for _, e := range entries { + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n", + e.NodeID, e.Address, e.Fingerprint, e.AddedAt.Format(time.RFC3339)) + } + return tw.Flush() + }, + } + t.AddCommand(list) + + remove := &cobra.Command{ + Use: "remove ", + Short: "Drop a peer from the local trust store", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Second) + defer cancel() + body := daemon.NodeRemoveBody{NodeID: args[0]} + raw, err := callDaemon(ctx, daemon.CtrlTrustRemove, body) + if err != nil { + return err + } + var res struct { + Removed bool `json:"removed"` + } + _ = json.Unmarshal(raw, &res) + if res.Removed { + fmt.Fprintf(cmd.OutOrStdout(), "trust entry %s removed\n", args[0]) + } else { + fmt.Fprintf(cmd.OutOrStdout(), "no trust entry for %s\n", args[0]) + } + return nil + }, + } + t.AddCommand(remove) + + root.AddCommand(t) +} diff --git a/internal/config/cluster.go b/internal/config/cluster.go new file mode 100644 index 0000000..365b713 --- /dev/null +++ b/internal/config/cluster.go @@ -0,0 +1,227 @@ +package config + +import ( + "fmt" + "os" + "sync" + "time" + + "gopkg.in/yaml.v3" +) + +// PeerInfo identifies a cluster member as known to all peers. +// (Trust material lives in trust.yaml; this struct stays portable.) +type PeerInfo struct { + NodeID string `yaml:"node_id"` + Advertise string `yaml:"advertise"` + Fingerprint string `yaml:"fingerprint"` +} + +// CheckType enumerates the supported probe kinds. +type CheckType string + +const ( + CheckHTTP CheckType = "http" + CheckTCP CheckType = "tcp" + CheckICMP CheckType = "icmp" +) + +// Check describes a single monitored target. +type Check struct { + ID string `yaml:"id"` + Name string `yaml:"name"` + Type CheckType `yaml:"type"` + Target string `yaml:"target"` // URL, host:port, or host + Interval time.Duration `yaml:"interval"` // default 30s + Timeout time.Duration `yaml:"timeout"` // default 10s + + // HTTP-only options. + ExpectStatus int `yaml:"expect_status,omitempty"` + BodyMatch string `yaml:"body_match,omitempty"` + + // AlertIDs lists which configured alerts fire when this check + // transitions state. + AlertIDs []string `yaml:"alert_ids,omitempty"` +} + +// AlertType enumerates supported notifier kinds. +type AlertType string + +const ( + AlertSMTP AlertType = "smtp" + AlertDiscord AlertType = "discord" +) + +// Alert describes a single notifier destination. +type Alert struct { + ID string `yaml:"id"` + Name string `yaml:"name"` + Type AlertType `yaml:"type"` + + // SMTP options. + SMTPHost string `yaml:"smtp_host,omitempty"` + SMTPPort int `yaml:"smtp_port,omitempty"` + SMTPUser string `yaml:"smtp_user,omitempty"` + SMTPPassword string `yaml:"smtp_password,omitempty"` + SMTPFrom string `yaml:"smtp_from,omitempty"` + SMTPTo []string `yaml:"smtp_to,omitempty"` + SMTPStartTLS bool `yaml:"smtp_starttls,omitempty"` + + // Discord options. + DiscordWebhook string `yaml:"discord_webhook,omitempty"` +} + +// ClusterConfig is the replicated cluster state. The Version field +// strictly increases on every mutation; the master is the only node +// that bumps it. +type ClusterConfig struct { + Version uint64 `yaml:"version"` + UpdatedAt time.Time `yaml:"updated_at"` + UpdatedBy string `yaml:"updated_by"` + + Peers []PeerInfo `yaml:"peers"` + Checks []Check `yaml:"checks"` + Alerts []Alert `yaml:"alerts"` + + mu sync.RWMutex `yaml:"-"` +} + +// LoadClusterConfig reads cluster.yaml. A missing file returns an +// empty (version 0) config — callers should treat that as the +// pre-bootstrap state. +func LoadClusterConfig() (*ClusterConfig, error) { + raw, err := os.ReadFile(ClusterFilePath()) + if err != nil { + if os.IsNotExist(err) { + return &ClusterConfig{}, nil + } + return nil, err + } + cfg := &ClusterConfig{} + if err := yaml.Unmarshal(raw, cfg); err != nil { + return nil, fmt.Errorf("parse cluster.yaml: %w", err) + } + return cfg, nil +} + +// Save writes cluster.yaml atomically. Caller is responsible for +// having already taken any external locks. +func (c *ClusterConfig) Save() error { + c.mu.RLock() + defer c.mu.RUnlock() + out, err := yaml.Marshal(c) + if err != nil { + return err + } + return AtomicWrite(ClusterFilePath(), out, 0o600) +} + +// Snapshot returns a deep-enough copy of the config that can be +// safely serialized while the original continues to mutate. +func (c *ClusterConfig) Snapshot() *ClusterConfig { + c.mu.RLock() + defer c.mu.RUnlock() + cp := &ClusterConfig{ + Version: c.Version, + UpdatedAt: c.UpdatedAt, + UpdatedBy: c.UpdatedBy, + Peers: append([]PeerInfo(nil), c.Peers...), + Checks: append([]Check(nil), c.Checks...), + Alerts: append([]Alert(nil), c.Alerts...), + } + return cp +} + +// Mutate runs fn under the config write lock, bumps Version on +// success, and writes the file. Only the master should call this. +func (c *ClusterConfig) Mutate(byNode string, fn func(*ClusterConfig) error) error { + c.mu.Lock() + defer c.mu.Unlock() + if err := fn(c); err != nil { + return err + } + c.Version++ + c.UpdatedAt = time.Now().UTC() + c.UpdatedBy = byNode + out, err := yaml.Marshal(c) + if err != nil { + return err + } + return AtomicWrite(ClusterFilePath(), out, 0o600) +} + +// Replace overwrites the local config with an incoming snapshot if +// that snapshot has a strictly greater version. Returns true if +// applied. +func (c *ClusterConfig) Replace(incoming *ClusterConfig) (bool, error) { + c.mu.Lock() + defer c.mu.Unlock() + if incoming.Version <= c.Version { + return false, nil + } + c.Version = incoming.Version + c.UpdatedAt = incoming.UpdatedAt + c.UpdatedBy = incoming.UpdatedBy + c.Peers = append([]PeerInfo(nil), incoming.Peers...) + c.Checks = append([]Check(nil), incoming.Checks...) + c.Alerts = append([]Alert(nil), incoming.Alerts...) + out, err := yaml.Marshal(c) + if err != nil { + return false, err + } + if err := AtomicWrite(ClusterFilePath(), out, 0o600); err != nil { + return false, err + } + return true, nil +} + +// FindCheck returns the check with the given ID or name. +func (c *ClusterConfig) FindCheck(idOrName string) (*Check, int) { + c.mu.RLock() + defer c.mu.RUnlock() + for i := range c.Checks { + if c.Checks[i].ID == idOrName || c.Checks[i].Name == idOrName { + cp := c.Checks[i] + return &cp, i + } + } + return nil, -1 +} + +// FindAlert returns the alert with the given ID or name. +func (c *ClusterConfig) FindAlert(idOrName string) (*Alert, int) { + c.mu.RLock() + defer c.mu.RUnlock() + for i := range c.Alerts { + if c.Alerts[i].ID == idOrName || c.Alerts[i].Name == idOrName { + cp := c.Alerts[i] + return &cp, i + } + } + return nil, -1 +} + +// FindPeer returns the peer with the given node ID. +func (c *ClusterConfig) FindPeer(nodeID string) (*PeerInfo, int) { + c.mu.RLock() + defer c.mu.RUnlock() + for i := range c.Peers { + if c.Peers[i].NodeID == nodeID { + cp := c.Peers[i] + return &cp, i + } + } + return nil, -1 +} + +// QuorumSize returns the minimum number of live nodes required for +// the cluster to make progress: floor(N/2) + 1. +func (c *ClusterConfig) QuorumSize() int { + c.mu.RLock() + defer c.mu.RUnlock() + n := len(c.Peers) + if n == 0 { + return 1 + } + return n/2 + 1 +} diff --git a/internal/config/node.go b/internal/config/node.go new file mode 100644 index 0000000..3c4c27e --- /dev/null +++ b/internal/config/node.go @@ -0,0 +1,67 @@ +package config + +import ( + "fmt" + "os" + + "gopkg.in/yaml.v3" +) + +// NodeConfig is the per-node, never-replicated identity file. +type NodeConfig struct { + // NodeID is a stable UUID generated at `qu init`. Used by all peers + // to refer to this node across restarts and IP changes. + NodeID string `yaml:"node_id"` + + // BindAddr is the address the daemon listens on for inter-node + // traffic. Defaults to 0.0.0.0. + BindAddr string `yaml:"bind_addr"` + + // BindPort is the port the daemon listens on. Default 9001. + BindPort int `yaml:"bind_port"` + + // Advertise is the address other nodes use to reach us. May differ + // from BindAddr when behind NAT. Set explicitly via `qu init --advertise`. + Advertise string `yaml:"advertise"` +} + +// AdvertiseAddr returns the address peers should dial. Falls back to +// BindAddr:BindPort if Advertise is empty. +func (n *NodeConfig) AdvertiseAddr() string { + if n.Advertise != "" { + return n.Advertise + } + bind := n.BindAddr + if bind == "" || bind == "0.0.0.0" || bind == "::" { + bind = "127.0.0.1" + } + return fmt.Sprintf("%s:%d", bind, n.BindPort) +} + +// LoadNodeConfig reads node.yaml from the data dir. +func LoadNodeConfig() (*NodeConfig, error) { + raw, err := os.ReadFile(NodeFilePath()) + if err != nil { + return nil, err + } + cfg := &NodeConfig{} + if err := yaml.Unmarshal(raw, cfg); err != nil { + return nil, fmt.Errorf("parse node.yaml: %w", err) + } + if cfg.BindPort == 0 { + cfg.BindPort = 9001 + } + if cfg.BindAddr == "" { + cfg.BindAddr = "0.0.0.0" + } + return cfg, nil +} + +// Save writes node.yaml atomically. +func (n *NodeConfig) Save() error { + out, err := yaml.Marshal(n) + if err != nil { + return err + } + return AtomicWrite(NodeFilePath(), out, 0o600) +} diff --git a/internal/config/paths.go b/internal/config/paths.go new file mode 100644 index 0000000..4335818 --- /dev/null +++ b/internal/config/paths.go @@ -0,0 +1,140 @@ +// Package config owns the on-disk layout of a node's state. +// +// Two YAML files live under the data directory: +// +// node.yaml — local identity, never replicated (id, addresses, key paths) +// cluster.yaml — replicated state (peers, checks, alerts, version) +// trust.yaml — local fingerprint trust store +// keys/ — RSA private + public keys + self-signed cert +// state.json — runtime cache (last check results, current master) +// +// A unix socket for the local CLI lives alongside (defaults to +// /var/run/quptime/quptime.sock when running as root, otherwise +// $XDG_RUNTIME_DIR/quptime/quptime.sock). +package config + +import ( + "errors" + "os" + "path/filepath" +) + +// Default file names. Callers should always go through DataDir() so an +// override via QUPTIME_DIR is respected. +const ( + NodeFile = "node.yaml" + ClusterFile = "cluster.yaml" + TrustFile = "trust.yaml" + StateFile = "state.json" + KeysDir = "keys" + PrivateKey = "private.pem" + PublicKey = "public.pem" + CertFile = "cert.pem" + SocketName = "quptime.sock" + + envDataDir = "QUPTIME_DIR" +) + +// DataDir returns the configured data directory. Order of resolution: +// 1. $QUPTIME_DIR if set +// 2. /etc/quptime when running as root +// 3. $XDG_CONFIG_HOME/quptime (or ~/.config/quptime) otherwise +func DataDir() string { + if v := os.Getenv(envDataDir); v != "" { + return v + } + if os.Geteuid() == 0 { + return "/etc/quptime" + } + if v := os.Getenv("XDG_CONFIG_HOME"); v != "" { + return filepath.Join(v, "quptime") + } + home, err := os.UserHomeDir() + if err != nil || home == "" { + return "./quptime" + } + return filepath.Join(home, ".config", "quptime") +} + +// SocketPath returns the unix socket used for local CLI ↔ daemon control. +func SocketPath() string { + if v := os.Getenv("QUPTIME_SOCKET"); v != "" { + return v + } + if os.Geteuid() == 0 { + return "/var/run/quptime/" + SocketName + } + if v := os.Getenv("XDG_RUNTIME_DIR"); v != "" { + return filepath.Join(v, "quptime", SocketName) + } + return filepath.Join(os.TempDir(), "quptime-"+envUserSuffix(), SocketName) +} + +func envUserSuffix() string { + if u := os.Getenv("USER"); u != "" { + return u + } + return "default" +} + +// NodeFilePath returns the absolute path to node.yaml. +func NodeFilePath() string { return filepath.Join(DataDir(), NodeFile) } + +// ClusterFilePath returns the absolute path to cluster.yaml. +func ClusterFilePath() string { return filepath.Join(DataDir(), ClusterFile) } + +// TrustFilePath returns the absolute path to trust.yaml. +func TrustFilePath() string { return filepath.Join(DataDir(), TrustFile) } + +// StateFilePath returns the absolute path to state.json. +func StateFilePath() string { return filepath.Join(DataDir(), StateFile) } + +// PrivateKeyPath returns the absolute path to the RSA private key. +func PrivateKeyPath() string { return filepath.Join(DataDir(), KeysDir, PrivateKey) } + +// PublicKeyPath returns the absolute path to the RSA public key. +func PublicKeyPath() string { return filepath.Join(DataDir(), KeysDir, PublicKey) } + +// CertFilePath returns the absolute path to the self-signed cert (PEM). +func CertFilePath() string { return filepath.Join(DataDir(), KeysDir, CertFile) } + +// EnsureDataDir creates the data directory tree if absent. +func EnsureDataDir() error { + dir := DataDir() + if err := os.MkdirAll(filepath.Join(dir, KeysDir), 0o700); err != nil { + return err + } + return os.MkdirAll(filepath.Dir(SocketPath()), 0o700) +} + +// AtomicWrite writes data to path through a temp file + rename. The temp +// file is created in the same directory so the rename is atomic on POSIX. +func AtomicWrite(path string, data []byte, perm os.FileMode) error { + if path == "" { + return errors.New("empty path") + } + dir := filepath.Dir(path) + if err := os.MkdirAll(dir, 0o700); err != nil { + return err + } + tmp, err := os.CreateTemp(dir, filepath.Base(path)+".tmp-*") + if err != nil { + return err + } + tmpName := tmp.Name() + if _, err := tmp.Write(data); err != nil { + tmp.Close() + os.Remove(tmpName) + return err + } + if err := tmp.Chmod(perm); err != nil { + tmp.Close() + os.Remove(tmpName) + return err + } + if err := tmp.Close(); err != nil { + os.Remove(tmpName) + return err + } + return os.Rename(tmpName, path) +} diff --git a/internal/crypto/cert.go b/internal/crypto/cert.go new file mode 100644 index 0000000..c84f8dd --- /dev/null +++ b/internal/crypto/cert.go @@ -0,0 +1,77 @@ +package crypto + +import ( + "crypto/rand" + "crypto/rsa" + "crypto/sha256" + "crypto/x509" + "crypto/x509/pkix" + "encoding/hex" + "encoding/pem" + "errors" + "fmt" + "math/big" + "time" +) + +// CertValidity is how long self-signed certs are valid for. We use a +// long horizon because cert rotation is operator-driven, not automatic. +const CertValidity = 10 * 365 * 24 * time.Hour + +// buildSelfSignedCert produces an X.509 certificate signed by `priv` +// itself, using the given common name. Returns the DER bytes. +func buildSelfSignedCert(priv *rsa.PrivateKey, commonName string) ([]byte, error) { + serial, err := rand.Int(rand.Reader, new(big.Int).Lsh(big.NewInt(1), 128)) + if err != nil { + return nil, err + } + tmpl := &x509.Certificate{ + SerialNumber: serial, + Subject: pkix.Name{CommonName: commonName, Organization: []string{"quptime"}}, + NotBefore: time.Now().Add(-1 * time.Hour), + NotAfter: time.Now().Add(CertValidity), + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth}, + BasicConstraintsValid: true, + IsCA: false, + } + return x509.CreateCertificate(rand.Reader, tmpl, tmpl, &priv.PublicKey, priv) +} + +// Fingerprint computes the SHA-256 fingerprint of an X.509 certificate's +// SubjectPublicKeyInfo (the same hash used by `openssl x509 -pubkey -noout +// | openssl dgst -sha256`). Returns the lowercase hex digest with a +// "sha256:" prefix to match SSH conventions. +func Fingerprint(cert *x509.Certificate) string { + return FingerprintFromSPKI(cert.RawSubjectPublicKeyInfo) +} + +// FingerprintFromSPKI is the underlying helper. +func FingerprintFromSPKI(spki []byte) string { + sum := sha256.Sum256(spki) + return "sha256:" + hex.EncodeToString(sum[:]) +} + +// FingerprintFromCertPEM parses a PEM-encoded certificate and returns +// its fingerprint. +func FingerprintFromCertPEM(certPEM []byte) (string, error) { + block, _ := pem.Decode(certPEM) + if block == nil { + return "", errors.New("cert: no PEM block") + } + cert, err := x509.ParseCertificate(block.Bytes) + if err != nil { + return "", fmt.Errorf("parse cert: %w", err) + } + return Fingerprint(cert), nil +} + +// FingerprintFromPubKeyPEM parses a public-key PEM and returns its +// fingerprint over the same SPKI bytes. +func FingerprintFromPubKeyPEM(pubPEM []byte) (string, error) { + block, _ := pem.Decode(pubPEM) + if block == nil { + return "", errors.New("pubkey: no PEM block") + } + return FingerprintFromSPKI(block.Bytes), nil +} diff --git a/internal/crypto/keys.go b/internal/crypto/keys.go new file mode 100644 index 0000000..c5f569e --- /dev/null +++ b/internal/crypto/keys.go @@ -0,0 +1,100 @@ +// Package crypto handles the RSA key material every node uses for +// mutual TLS authentication and for the trust-store fingerprint pinning. +// +// Keys are RSA-3072 (NIST 112-bit security, well within the safe band +// through ~2030). They live PEM-encoded under /keys/. +package crypto + +import ( + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "encoding/pem" + "errors" + "fmt" + "os" + + "github.com/jasper/quptime/internal/config" +) + +// KeySize is the RSA modulus size used by qu. +const KeySize = 3072 + +// GenerateKeyPair creates a fresh RSA keypair and writes the private, +// public, and self-signed certificate to the standard paths. +// It refuses to overwrite existing keys. +func GenerateKeyPair(commonName string) (*rsa.PrivateKey, error) { + if _, err := os.Stat(config.PrivateKeyPath()); err == nil { + return nil, errors.New("key material already exists; refusing to overwrite") + } + if err := config.EnsureDataDir(); err != nil { + return nil, err + } + priv, err := rsa.GenerateKey(rand.Reader, KeySize) + if err != nil { + return nil, fmt.Errorf("generate rsa key: %w", err) + } + if err := writePEM(config.PrivateKeyPath(), "RSA PRIVATE KEY", + x509.MarshalPKCS1PrivateKey(priv), 0o600); err != nil { + return nil, err + } + pubDER, err := x509.MarshalPKIXPublicKey(&priv.PublicKey) + if err != nil { + return nil, err + } + if err := writePEM(config.PublicKeyPath(), "PUBLIC KEY", pubDER, 0o644); err != nil { + return nil, err + } + certDER, err := buildSelfSignedCert(priv, commonName) + if err != nil { + return nil, err + } + if err := writePEM(config.CertFilePath(), "CERTIFICATE", certDER, 0o644); err != nil { + return nil, err + } + return priv, nil +} + +// LoadPrivateKey reads the on-disk RSA private key. +func LoadPrivateKey() (*rsa.PrivateKey, error) { + raw, err := os.ReadFile(config.PrivateKeyPath()) + if err != nil { + return nil, err + } + block, _ := pem.Decode(raw) + if block == nil { + return nil, errors.New("private key: no PEM block") + } + switch block.Type { + case "RSA PRIVATE KEY": + return x509.ParsePKCS1PrivateKey(block.Bytes) + case "PRIVATE KEY": + k, err := x509.ParsePKCS8PrivateKey(block.Bytes) + if err != nil { + return nil, err + } + rk, ok := k.(*rsa.PrivateKey) + if !ok { + return nil, errors.New("private key: not RSA") + } + return rk, nil + default: + return nil, fmt.Errorf("private key: unexpected PEM type %q", block.Type) + } +} + +// LoadCertPEM reads the self-signed cert file (used as the TLS leaf). +func LoadCertPEM() ([]byte, error) { + return os.ReadFile(config.CertFilePath()) +} + +// LoadPublicKeyPEM reads the public-key PEM (exchanged out of band +// during invite / join). +func LoadPublicKeyPEM() ([]byte, error) { + return os.ReadFile(config.PublicKeyPath()) +} + +func writePEM(path, blockType string, der []byte, perm os.FileMode) error { + encoded := pem.EncodeToMemory(&pem.Block{Type: blockType, Bytes: der}) + return config.AtomicWrite(path, encoded, perm) +} diff --git a/internal/daemon/control.go b/internal/daemon/control.go new file mode 100644 index 0000000..731626e --- /dev/null +++ b/internal/daemon/control.go @@ -0,0 +1,383 @@ +package daemon + +import ( + "context" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "os" + "path/filepath" + "sync" + + "github.com/jasper/quptime/internal/config" + "github.com/jasper/quptime/internal/crypto" + "github.com/jasper/quptime/internal/transport" + "github.com/jasper/quptime/internal/trust" +) + +// controlMaxFrame caps unix-socket request/response frames. Generous +// because cluster.yaml snapshots travel over this channel too. +const controlMaxFrame = 16 * 1024 * 1024 + +// Control method names. Defined as constants so the CLI side cannot +// drift out of sync with the daemon. +const ( + CtrlStatus = "status" + CtrlMutate = "mutate" + CtrlNodeProbe = "node.probe" + CtrlNodeAdd = "node.add" + CtrlNodeRemove = "node.remove" + CtrlTrustList = "trust.list" + CtrlTrustRemove = "trust.remove" + CtrlAlertTest = "alert.test" +) + +// CtrlRequest is the wire envelope for a CLI ↔ daemon message. +type CtrlRequest struct { + Method string `json:"method"` + Body json.RawMessage `json:"body,omitempty"` +} + +// CtrlResponse carries either an error or a result body. +type CtrlResponse struct { + Error string `json:"error,omitempty"` + Body json.RawMessage `json:"body,omitempty"` +} + +// MutateBody is the payload of CtrlMutate. +type MutateBody struct { + Kind transport.MutationKind `json:"kind"` + Payload json.RawMessage `json:"payload"` +} + +// MutateResult reports the new cluster version after a successful +// mutation. +type MutateResult struct { + Version uint64 `json:"version"` +} + +// NodeProbeBody is the payload of CtrlNodeProbe. +type NodeProbeBody struct { + Address string `json:"address"` +} + +// NodeProbeResult lets the CLI show the operator what they're about +// to trust. +type NodeProbeResult struct { + NodeID string `json:"node_id"` + Fingerprint string `json:"fingerprint"` + CertPEM string `json:"cert_pem"` +} + +// NodeAddBody captures everything the daemon needs once the operator +// has confirmed the fingerprint. +type NodeAddBody struct { + Address string `json:"address"` + Fingerprint string `json:"fingerprint"` +} + +// NodeAddResult is returned when a peer has been trusted, joined, and +// added to the cluster config. +type NodeAddResult struct { + NodeID string `json:"node_id"` + Version uint64 `json:"version"` +} + +// AlertTestBody is the payload of CtrlAlertTest. +type AlertTestBody struct { + AlertID string `json:"alert_id"` +} + +// NodeRemoveBody / TrustRemoveBody share the same shape. +type NodeRemoveBody struct { + NodeID string `json:"node_id"` +} + +// controlServer accepts CLI commands over a unix socket. +type controlServer struct { + d *Daemon + + mu sync.Mutex + ln net.Listener + conns map[net.Conn]struct{} +} + +func newControlServer(d *Daemon) *controlServer { + return &controlServer{d: d, conns: map[net.Conn]struct{}{}} +} + +// Serve binds the unix socket and dispatches commands until ctx is +// cancelled. +func (c *controlServer) Serve(ctx context.Context) error { + sockPath := config.SocketPath() + if err := os.MkdirAll(filepath.Dir(sockPath), 0o700); err != nil { + return fmt.Errorf("control socket dir: %w", err) + } + // stale socket from a previous crash — unlink before binding + if fi, err := os.Stat(sockPath); err == nil && fi.Mode()&os.ModeSocket != 0 { + _ = os.Remove(sockPath) + } + ln, err := net.Listen("unix", sockPath) + if err != nil { + return fmt.Errorf("listen %s: %w", sockPath, err) + } + if err := os.Chmod(sockPath, 0o600); err != nil { + _ = ln.Close() + return fmt.Errorf("chmod %s: %w", sockPath, err) + } + c.mu.Lock() + c.ln = ln + c.mu.Unlock() + + go func() { + <-ctx.Done() + _ = ln.Close() + }() + + for { + conn, err := ln.Accept() + if err != nil { + if errors.Is(err, net.ErrClosed) { + return nil + } + return err + } + go c.handleConn(ctx, conn) + } +} + +// Stop closes the listener and any in-flight connections. +func (c *controlServer) Stop() { + c.mu.Lock() + if c.ln != nil { + _ = c.ln.Close() + } + for cn := range c.conns { + _ = cn.Close() + } + c.conns = map[net.Conn]struct{}{} + c.mu.Unlock() +} + +func (c *controlServer) handleConn(ctx context.Context, conn net.Conn) { + c.mu.Lock() + c.conns[conn] = struct{}{} + c.mu.Unlock() + defer func() { + c.mu.Lock() + delete(c.conns, conn) + c.mu.Unlock() + _ = conn.Close() + }() + + body, err := readCtrlFrame(conn) + if err != nil { + return + } + var req CtrlRequest + if err := json.Unmarshal(body, &req); err != nil { + _ = writeCtrlResponse(conn, CtrlResponse{Error: "decode: " + err.Error()}) + return + } + resp := c.dispatch(ctx, req) + _ = writeCtrlResponse(conn, resp) +} + +func (c *controlServer) dispatch(ctx context.Context, req CtrlRequest) CtrlResponse { + switch req.Method { + case CtrlStatus: + return ok(c.d.buildStatus()) + + case CtrlMutate: + var body MutateBody + if err := json.Unmarshal(req.Body, &body); err != nil { + return fail(err) + } + var payload json.RawMessage = body.Payload + ver, err := c.d.replicator.LocalMutate(ctx, body.Kind, json.RawMessage(payload)) + if err != nil { + return fail(err) + } + return ok(MutateResult{Version: ver}) + + case CtrlNodeProbe: + var body NodeProbeBody + if err := json.Unmarshal(req.Body, &body); err != nil { + return fail(err) + } + sample, err := transport.FetchPeerCert(ctx, c.d.assets, body.Address) + if err != nil { + return fail(err) + } + return ok(NodeProbeResult{ + NodeID: sample.Cert.Subject.CommonName, + Fingerprint: sample.Fingerprint, + CertPEM: string(sample.CertPEM), + }) + + case CtrlNodeAdd: + var body NodeAddBody + if err := json.Unmarshal(req.Body, &body); err != nil { + return fail(err) + } + result, err := c.d.nodeAdd(ctx, body) + if err != nil { + return fail(err) + } + return ok(result) + + case CtrlNodeRemove: + var body NodeRemoveBody + if err := json.Unmarshal(req.Body, &body); err != nil { + return fail(err) + } + ver, err := c.d.replicator.LocalMutate(ctx, transport.MutationRemovePeer, body.NodeID) + if err != nil { + return fail(err) + } + if _, err := c.d.trust.Remove(body.NodeID); err != nil { + return fail(err) + } + return ok(MutateResult{Version: ver}) + + case CtrlTrustList: + return ok(c.d.trust.List()) + + case CtrlTrustRemove: + var body NodeRemoveBody + if err := json.Unmarshal(req.Body, &body); err != nil { + return fail(err) + } + removed, err := c.d.trust.Remove(body.NodeID) + if err != nil { + return fail(err) + } + return ok(map[string]bool{"removed": removed}) + + case CtrlAlertTest: + var body AlertTestBody + if err := json.Unmarshal(req.Body, &body); err != nil { + return fail(err) + } + if err := c.d.dispatcher.Test(body.AlertID); err != nil { + return fail(err) + } + return ok(map[string]string{"status": "sent"}) + + default: + return CtrlResponse{Error: "unknown method: " + req.Method} + } +} + +// nodeAdd is the daemon-side TOFU completion path: it probes the peer +// for its current cert (verifying the fingerprint matches what the +// operator approved), records a trust entry, swaps trust with the +// peer via the Join RPC, and finally proposes a cluster mutation to +// list the peer in cluster.yaml. +func (d *Daemon) nodeAdd(ctx context.Context, body NodeAddBody) (NodeAddResult, error) { + sample, err := transport.FetchPeerCert(ctx, d.assets, body.Address) + if err != nil { + return NodeAddResult{}, fmt.Errorf("re-probe: %w", err) + } + if sample.Fingerprint != body.Fingerprint { + return NodeAddResult{}, fmt.Errorf("fingerprint changed since probe: got %s want %s", + sample.Fingerprint, body.Fingerprint) + } + peerID := sample.Cert.Subject.CommonName + if peerID == "" { + return NodeAddResult{}, errors.New("peer cert has no CommonName / NodeID") + } + + if err := d.trust.Add(trust.Entry{ + NodeID: peerID, + Address: body.Address, + Fingerprint: sample.Fingerprint, + CertPEM: string(sample.CertPEM), + }); err != nil { + return NodeAddResult{}, fmt.Errorf("trust add: %w", err) + } + + // Ask the peer to record us symmetrically. + myFP, err := crypto.FingerprintFromCertPEM(d.assets.Cert) + if err != nil { + return NodeAddResult{}, fmt.Errorf("own fingerprint: %w", err) + } + joinReq := transport.JoinRequest{ + NodeID: d.node.NodeID, + Advertise: d.node.AdvertiseAddr(), + Fingerprint: myFP, + CertPEM: string(d.assets.Cert), + } + var joinResp transport.JoinResponse + if err := d.client.Call(ctx, peerID, body.Address, transport.MethodJoin, joinReq, &joinResp); err != nil { + return NodeAddResult{}, fmt.Errorf("join %s: %w", peerID, err) + } + if !joinResp.Accepted { + return NodeAddResult{}, fmt.Errorf("peer rejected join: %s", joinResp.Error) + } + + // Propose the cluster-config addition. Routed to master via the + // replicator; if we are the master, applied directly. + peerInfo := config.PeerInfo{ + NodeID: peerID, + Advertise: body.Address, + Fingerprint: sample.Fingerprint, + } + ver, err := d.replicator.LocalMutate(ctx, transport.MutationAddPeer, peerInfo) + if err != nil { + return NodeAddResult{}, fmt.Errorf("propose peer: %w", err) + } + return NodeAddResult{NodeID: peerID, Version: ver}, nil +} + +func ok(v any) CtrlResponse { + raw, err := json.Marshal(v) + if err != nil { + return CtrlResponse{Error: err.Error()} + } + return CtrlResponse{Body: raw} +} + +func fail(err error) CtrlResponse { + return CtrlResponse{Error: err.Error()} +} + +func writeCtrlResponse(w io.Writer, resp CtrlResponse) error { + body, err := json.Marshal(resp) + if err != nil { + return err + } + return writeCtrlFrame(w, body) +} + +func writeCtrlFrame(w io.Writer, body []byte) error { + if len(body) > controlMaxFrame { + return errors.New("control frame too large") + } + var hdr [4]byte + binary.BigEndian.PutUint32(hdr[:], uint32(len(body))) + if _, err := w.Write(hdr[:]); err != nil { + return err + } + _, err := w.Write(body) + return err +} + +func readCtrlFrame(r io.Reader) ([]byte, error) { + var hdr [4]byte + if _, err := io.ReadFull(r, hdr[:]); err != nil { + return nil, err + } + n := binary.BigEndian.Uint32(hdr[:]) + if n > controlMaxFrame { + return nil, errors.New("control frame too large") + } + buf := make([]byte, n) + if _, err := io.ReadFull(r, buf); err != nil { + return nil, err + } + return buf, nil +} diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go new file mode 100644 index 0000000..5289875 --- /dev/null +++ b/internal/daemon/daemon.go @@ -0,0 +1,223 @@ +// Package daemon ties every long-running component together. +// +// Lifecycle +// +// - Load node identity, cluster config, trust store, and key material. +// - Build a transport.Client + transport.Server, share TLS assets. +// - Construct the quorum manager, replicator, aggregator and alert +// dispatcher; wire transport handlers; wire the version observer +// to the replicator's pull path; gate alert dispatch on +// "I am the master". +// - Start the inter-node listener, the local unix-socket control +// plane, the heartbeat loop and the check scheduler. +// - On ctx cancel, gracefully tear everything down. +package daemon + +import ( + "context" + "errors" + "fmt" + "log" + "os" + "sync" + "time" + + "github.com/jasper/quptime/internal/alerts" + "github.com/jasper/quptime/internal/checks" + "github.com/jasper/quptime/internal/config" + "github.com/jasper/quptime/internal/crypto" + "github.com/jasper/quptime/internal/quorum" + "github.com/jasper/quptime/internal/replicate" + "github.com/jasper/quptime/internal/transport" + "github.com/jasper/quptime/internal/trust" +) + +// Daemon is the live process: every long-running component lives here. +type Daemon struct { + logger *log.Logger + + node *config.NodeConfig + cluster *config.ClusterConfig + trust *trust.Store + + assets *transport.TLSAssets + client *transport.Client + server *transport.Server + + quorum *quorum.Manager + replicator *replicate.Replicator + aggregator *checks.Aggregator + dispatcher *alerts.Dispatcher + scheduler *checks.Scheduler + + control *controlServer + wg sync.WaitGroup +} + +// New loads every persistent piece of state and assembles the daemon. +// It does not start any goroutines. +func New(logger *log.Logger) (*Daemon, error) { + if logger == nil { + logger = log.New(os.Stderr, "quptime: ", log.LstdFlags|log.Lmsgprefix) + } + + node, err := config.LoadNodeConfig() + if err != nil { + return nil, fmt.Errorf("load node.yaml: %w", err) + } + if node.NodeID == "" { + return nil, errors.New("node.yaml has empty node_id — run `qu init` first") + } + + cluster, err := config.LoadClusterConfig() + if err != nil { + return nil, fmt.Errorf("load cluster.yaml: %w", err) + } + + store, err := trust.Load() + if err != nil { + return nil, fmt.Errorf("load trust.yaml: %w", err) + } + + priv, err := crypto.LoadPrivateKey() + if err != nil { + return nil, fmt.Errorf("load private key: %w", err) + } + certPEM, err := crypto.LoadCertPEM() + if err != nil { + return nil, fmt.Errorf("load cert: %w", err) + } + + assets := &transport.TLSAssets{Cert: certPEM, Key: priv, Trust: store} + client := transport.NewClient(assets) + server := transport.NewServer(assets) + + d := &Daemon{ + logger: logger, + node: node, + cluster: cluster, + trust: store, + assets: assets, + client: client, + server: server, + } + + d.quorum = quorum.New(node.NodeID, cluster, client) + d.replicator = replicate.New(node.NodeID, cluster, client, d.quorum) + d.aggregator = checks.NewAggregator(cluster, nil) + d.dispatcher = alerts.New(cluster, node.NodeID, logger) + + d.aggregator.SetTransition(func(check *config.Check, from, to checks.State, snap checks.Snapshot) { + if !d.quorum.IsMaster() { + return + } + d.dispatcher.OnTransition(check, from, to, snap) + }) + + d.quorum.SetVersionObserver(func(peerID, peerAddr string, peerVer uint64) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := d.replicator.PullFrom(ctx, peerID, peerAddr); err != nil { + d.logger.Printf("replicate: pull from %s: %v", peerID, err) + } + }) + + d.scheduler = checks.NewScheduler(cluster, &sink{d: d}) + d.control = newControlServer(d) + d.registerHandlers() + return d, nil +} + +// Run binds the inter-node listener and the local control socket, +// starts the quorum loop and the scheduler, and blocks until ctx is +// cancelled. +func (d *Daemon) Run(ctx context.Context) error { + addr := fmt.Sprintf("%s:%d", d.node.BindAddr, d.node.BindPort) + d.logger.Printf("listening on %s as node %s", addr, d.node.NodeID) + + servErr := make(chan error, 1) + d.wg.Add(1) + go func() { + defer d.wg.Done() + servErr <- d.server.Serve(ctx, addr) + }() + + ctrlErr := make(chan error, 1) + d.wg.Add(1) + go func() { + defer d.wg.Done() + ctrlErr <- d.control.Serve(ctx) + }() + + d.wg.Add(1) + go func() { + defer d.wg.Done() + d.quorum.Start(ctx) + }() + + d.wg.Add(1) + go func() { + defer d.wg.Done() + d.scheduler.Start(ctx) + }() + + select { + case <-ctx.Done(): + case err := <-servErr: + if err != nil { + d.logger.Printf("transport server exited: %v", err) + } + case err := <-ctrlErr: + if err != nil { + d.logger.Printf("control server exited: %v", err) + } + } + + d.server.Stop() + d.control.Stop() + d.client.Close() + d.wg.Wait() + return nil +} + +// sink routes scheduled probe results either into the local +// aggregator (when self is master) or to the current master over +// RPC. Implements checks.Sink. +type sink struct{ d *Daemon } + +func (s *sink) Submit(r checks.Result) { + if s.d.quorum.IsMaster() { + s.d.aggregator.Submit(s.d.node.NodeID, r) + return + } + masterID := s.d.quorum.Master() + if masterID == "" { + return // no master right now — drop; we'll probe again next interval + } + addr := s.d.addressOf(masterID) + if addr == "" { + return + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + req := transport.ReportResultRequest{ + FromNodeID: s.d.node.NodeID, + CheckID: r.CheckID, + OK: r.OK, + Detail: r.Detail, + LatencyMS: r.Latency.Milliseconds(), + At: r.Timestamp, + } + if err := s.d.client.Call(ctx, masterID, addr, transport.MethodReportResult, req, nil); err != nil { + s.d.logger.Printf("report to master %s: %v", masterID, err) + } +} + +func (d *Daemon) addressOf(nodeID string) string { + for _, p := range d.cluster.Snapshot().Peers { + if p.NodeID == nodeID { + return p.Advertise + } + } + return "" +} diff --git a/internal/daemon/handlers.go b/internal/daemon/handlers.go new file mode 100644 index 0000000..1f92cb2 --- /dev/null +++ b/internal/daemon/handlers.go @@ -0,0 +1,150 @@ +package daemon + +import ( + "context" + "encoding/json" + "time" + + "github.com/jasper/quptime/internal/checks" + "github.com/jasper/quptime/internal/crypto" + "github.com/jasper/quptime/internal/transport" + "github.com/jasper/quptime/internal/trust" +) + +// registerHandlers wires every inter-node RPC method that the daemon +// understands onto the transport server. Each method delegates to the +// owning subsystem (quorum, replicator, etc.) so this file stays a +// thin dispatch table. +func (d *Daemon) registerHandlers() { + d.server.Handle(transport.MethodPing, func(_ context.Context, _ string, _ json.RawMessage) (any, error) { + return transport.PingResponse{NodeID: d.node.NodeID, Now: time.Now().UTC()}, nil + }) + + d.server.Handle(transport.MethodWhoAmI, func(_ context.Context, _ string, _ json.RawMessage) (any, error) { + fp, err := crypto.FingerprintFromCertPEM(d.assets.Cert) + if err != nil { + return nil, err + } + return transport.WhoAmIResponse{ + NodeID: d.node.NodeID, + Advertise: d.node.AdvertiseAddr(), + Fingerprint: fp, + CertPEM: string(d.assets.Cert), + }, nil + }) + + d.server.Handle(transport.MethodJoin, func(_ context.Context, _ string, raw json.RawMessage) (any, error) { + var req transport.JoinRequest + if err := json.Unmarshal(raw, &req); err != nil { + return transport.JoinResponse{Error: err.Error()}, nil + } + fp, err := crypto.FingerprintFromCertPEM([]byte(req.CertPEM)) + if err != nil { + return transport.JoinResponse{Error: "parse cert: " + err.Error()}, nil + } + if fp != req.Fingerprint { + return transport.JoinResponse{Error: "fingerprint mismatch"}, nil + } + // Outbound join (the proposing node already accepted our cert + // out of band). Symmetric trust is required for mTLS to work, + // so we accept the join automatically. Operators who need + // stricter onboarding can disable the listener and use the + // CLI flow exclusively. + if err := d.trust.Add(trust.Entry{ + NodeID: req.NodeID, + Address: req.Advertise, + Fingerprint: req.Fingerprint, + CertPEM: req.CertPEM, + }); err != nil { + return transport.JoinResponse{Error: err.Error()}, nil + } + return transport.JoinResponse{Accepted: true}, nil + }) + + d.server.Handle(transport.MethodHeartbeat, func(_ context.Context, _ string, raw json.RawMessage) (any, error) { + var req transport.HeartbeatRequest + if err := json.Unmarshal(raw, &req); err != nil { + return nil, err + } + return d.quorum.HandleHeartbeat(req), nil + }) + + d.server.Handle(transport.MethodGetClusterCfg, func(_ context.Context, _ string, _ json.RawMessage) (any, error) { + return d.replicator.HandleGetClusterCfg(), nil + }) + + d.server.Handle(transport.MethodApplyClusterCfg, func(_ context.Context, _ string, raw json.RawMessage) (any, error) { + var req transport.ApplyClusterCfgRequest + if err := json.Unmarshal(raw, &req); err != nil { + return nil, err + } + return d.replicator.HandleApplyClusterCfg(req), nil + }) + + d.server.Handle(transport.MethodProposeMutation, func(ctx context.Context, _ string, raw json.RawMessage) (any, error) { + var req transport.ProposeMutationRequest + if err := json.Unmarshal(raw, &req); err != nil { + return nil, err + } + return d.replicator.HandleProposeMutation(ctx, req), nil + }) + + d.server.Handle(transport.MethodReportResult, func(_ context.Context, _ string, raw json.RawMessage) (any, error) { + var req transport.ReportResultRequest + if err := json.Unmarshal(raw, &req); err != nil { + return nil, err + } + res := checks.Result{ + CheckID: req.CheckID, + OK: req.OK, + Detail: req.Detail, + Latency: time.Duration(req.LatencyMS) * time.Millisecond, + Timestamp: req.At, + } + d.aggregator.Submit(req.FromNodeID, res) + return transport.ReportResultResponse{}, nil + }) + + d.server.Handle(transport.MethodStatus, func(_ context.Context, _ string, _ json.RawMessage) (any, error) { + return d.buildStatus(), nil + }) +} + +// buildStatus is shared by both the inter-node Status RPC handler and +// the local control plane's "status" command. +func (d *Daemon) buildStatus() transport.StatusResponse { + snap := d.cluster.Snapshot() + liveness := d.quorum.Liveness() + live := map[string]bool{} + for _, id := range d.quorum.LiveSet() { + live[id] = true + } + + out := transport.StatusResponse{ + NodeID: d.node.NodeID, + Term: d.quorum.Term(), + MasterID: d.quorum.Master(), + Version: snap.Version, + HasQuorum: d.quorum.HasQuorum(), + QuorumSize: snap.QuorumSize(), + } + for _, p := range snap.Peers { + out.Peers = append(out.Peers, transport.PeerLiveness{ + NodeID: p.NodeID, + Advertise: p.Advertise, + Live: live[p.NodeID], + LastSeen: liveness[p.NodeID], + }) + } + for _, c := range snap.Checks { + cs := transport.CheckSnapshot{CheckID: c.ID, Name: c.Name, State: "unknown"} + if agg, ok := d.aggregator.SnapshotFor(c.ID); ok { + cs.State = string(agg.State) + cs.OKCount = agg.OKCount + cs.Total = agg.Reports + cs.Detail = agg.Detail + } + out.Checks = append(out.Checks, cs) + } + return out +} diff --git a/internal/quorum/manager.go b/internal/quorum/manager.go new file mode 100644 index 0000000..16d3053 --- /dev/null +++ b/internal/quorum/manager.go @@ -0,0 +1,250 @@ +// Package quorum owns membership liveness and master election. +// +// Model +// +// - Membership N is the set of peers listed in cluster.yaml (every +// node, including self). +// - A peer is "live" if we have seen a heartbeat (sent or received) +// within the dead-after window. +// - Quorum is met when the live set's size is ≥ ⌈N/2⌉+1. +// - When quorum holds, the master is the live member with the +// lexicographically smallest NodeID. Otherwise the cluster has no +// master. +// - The term integer is bumped every time the elected master +// changes — including transitions to and from "no master". +// +// The rule is deliberately deterministic: every node that sees the +// same live set picks the same master, so there is no negotiation +// step and no split-brain window. +package quorum + +import ( + "context" + "sort" + "sync" + "time" + + "github.com/jasper/quptime/internal/config" + "github.com/jasper/quptime/internal/transport" +) + +// Defaults for the heartbeat loop. The dead-after is comfortably +// above three missed beats so a transient blip never trips a master +// re-election. +const ( + DefaultHeartbeatInterval = 1 * time.Second + DefaultDeadAfter = 4 * time.Second +) + +// VersionObserver is invoked whenever a heartbeat exchange reveals +// that a peer carries a strictly greater cluster-config version than +// ours. The replication layer uses this to schedule a pull. +type VersionObserver func(peerID, peerAddr string, peerVersion uint64) + +// Manager coordinates heartbeats and master election for one node. +type Manager struct { + selfID string + cluster *config.ClusterConfig + client *transport.Client + + heartbeatInterval time.Duration + deadAfter time.Duration + + mu sync.RWMutex + term uint64 + masterID string + lastSeen map[string]time.Time // peerID -> last contact (sent or recv) + addrOf map[string]string // peerID -> advertise addr (last known) + + observer VersionObserver +} + +// New constructs a Manager bound to the given identity, cluster config, +// and RPC client. The Manager does not start any goroutines until +// Start is called. +func New(selfID string, cluster *config.ClusterConfig, client *transport.Client) *Manager { + return &Manager{ + selfID: selfID, + cluster: cluster, + client: client, + heartbeatInterval: DefaultHeartbeatInterval, + deadAfter: DefaultDeadAfter, + lastSeen: map[string]time.Time{}, + addrOf: map[string]string{}, + } +} + +// SetVersionObserver registers a callback fired when a peer reports a +// higher cluster-config version than ours. +func (m *Manager) SetVersionObserver(fn VersionObserver) { + m.observer = fn +} + +// Start spins up the heartbeat loop and the election ticker. +// Returns when ctx is cancelled. +func (m *Manager) Start(ctx context.Context) { + // Mark self live so a one-node cluster elects itself on tick zero. + m.markLive(m.selfID) + m.recomputeMaster() + + t := time.NewTicker(m.heartbeatInterval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + m.tick(ctx) + } + } +} + +// HandleHeartbeat is the inbound RPC handler. Records the sender as +// live and returns our current view of term, master, and version. +func (m *Manager) HandleHeartbeat(req transport.HeartbeatRequest) transport.HeartbeatResponse { + if req.FromNodeID != "" && req.FromNodeID != m.selfID { + m.markLive(req.FromNodeID) + m.maybeNotifyVersion(req.FromNodeID, req.Version) + } + m.recomputeMaster() + v := m.cluster.Snapshot().Version + return transport.HeartbeatResponse{ + NodeID: m.selfID, + Term: m.Term(), + MasterID: m.Master(), + Version: v, + } +} + +// Master returns the currently-elected master NodeID. Empty when the +// cluster has no quorum. +func (m *Manager) Master() string { + m.mu.RLock() + defer m.mu.RUnlock() + return m.masterID +} + +// IsMaster is a convenience predicate. +func (m *Manager) IsMaster() bool { + return m.Master() == m.selfID +} + +// Term returns the current election term. +func (m *Manager) Term() uint64 { + m.mu.RLock() + defer m.mu.RUnlock() + return m.term +} + +// HasQuorum reports whether the live set is large enough to elect a +// master. +func (m *Manager) HasQuorum() bool { + live := m.LiveSet() + return len(live) >= m.cluster.QuorumSize() +} + +// LiveSet returns a copy of the currently-live NodeIDs. +func (m *Manager) LiveSet() []string { + m.mu.RLock() + defer m.mu.RUnlock() + cutoff := time.Now().Add(-m.deadAfter) + out := make([]string, 0, len(m.lastSeen)+1) + for id, ts := range m.lastSeen { + if ts.After(cutoff) || id == m.selfID { + out = append(out, id) + } + } + sort.Strings(out) + return out +} + +// Liveness returns the peer ID → last_seen map snapshot for status. +func (m *Manager) Liveness() map[string]time.Time { + m.mu.RLock() + defer m.mu.RUnlock() + out := make(map[string]time.Time, len(m.lastSeen)) + for k, v := range m.lastSeen { + out[k] = v + } + return out +} + +// tick fires one round of heartbeats to all peers (except self) and +// then re-runs the election. +func (m *Manager) tick(ctx context.Context) { + snap := m.cluster.Snapshot() + // remember addresses so we can dial peers even if cluster.yaml shifts + for _, p := range snap.Peers { + if p.NodeID != "" && p.Advertise != "" { + m.mu.Lock() + m.addrOf[p.NodeID] = p.Advertise + m.mu.Unlock() + } + } + + currentMaster := m.Master() + for _, p := range snap.Peers { + if p.NodeID == m.selfID || p.NodeID == "" || p.Advertise == "" { + continue + } + peerID, addr := p.NodeID, p.Advertise + + go func(peerID, addr string) { + callCtx, cancel := context.WithTimeout(ctx, m.heartbeatInterval) + defer cancel() + req := transport.HeartbeatRequest{ + FromNodeID: m.selfID, + Term: m.Term(), + MasterID: currentMaster, + Version: snap.Version, + } + var resp transport.HeartbeatResponse + if err := m.client.Call(callCtx, peerID, addr, + transport.MethodHeartbeat, req, &resp); err != nil { + return + } + m.markLive(peerID) + m.maybeNotifyVersion(peerID, resp.Version) + }(peerID, addr) + } + + m.markLive(m.selfID) + m.recomputeMaster() +} + +func (m *Manager) markLive(id string) { + m.mu.Lock() + m.lastSeen[id] = time.Now() + m.mu.Unlock() +} + +func (m *Manager) maybeNotifyVersion(peerID string, peerVer uint64) { + if m.observer == nil { + return + } + local := m.cluster.Snapshot().Version + if peerVer <= local { + return + } + m.mu.RLock() + addr := m.addrOf[peerID] + m.mu.RUnlock() + m.observer(peerID, addr, peerVer) +} + +func (m *Manager) recomputeMaster() { + live := m.LiveSet() + quorum := m.cluster.QuorumSize() + + m.mu.Lock() + defer m.mu.Unlock() + + var newMaster string + if len(live) >= quorum && len(live) > 0 { + newMaster = live[0] // lowest NodeID wins + } + if newMaster != m.masterID { + m.term++ + m.masterID = newMaster + } +} diff --git a/internal/replicate/replicator.go b/internal/replicate/replicator.go new file mode 100644 index 0000000..83a86f2 --- /dev/null +++ b/internal/replicate/replicator.go @@ -0,0 +1,293 @@ +// Package replicate keeps cluster.yaml consistent across every node. +// +// Every mutation goes through the elected master. Followers either +// +// - forward a CLI-originated proposal to the master via the +// ProposeMutation RPC, then trust the master's broadcast to +// update their local copy; or +// - observe via heartbeat that the master holds a higher version +// and pull the new snapshot on their own. +// +// The master applies, bumps the monotonic Version counter, and +// pushes the new snapshot to every peer. Peers accept the snapshot +// only when its Version is strictly greater than their local Version. +// +// The package owns no transport or quorum logic — it consumes those +// through small interfaces so it stays easy to test. +package replicate + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/jasper/quptime/internal/config" + "github.com/jasper/quptime/internal/transport" +) + +// MasterView is the minimum the replicator needs from the quorum +// manager: who the current master is and what address to reach them +// at. Implemented by *quorum.Manager. +type MasterView interface { + Master() string + IsMaster() bool + HasQuorum() bool +} + +// Replicator drives mutation routing and broadcast. +type Replicator struct { + selfID string + cluster *config.ClusterConfig + client *transport.Client + master MasterView +} + +// New constructs a replicator. selfID is this node's NodeID. +func New(selfID string, cluster *config.ClusterConfig, client *transport.Client, master MasterView) *Replicator { + return &Replicator{ + selfID: selfID, + cluster: cluster, + client: client, + master: master, + } +} + +// LocalMutate is called by the CLI/control plane on this node to +// effect a config change. It routes the mutation to the master and +// returns the new version on success. +func (r *Replicator) LocalMutate(ctx context.Context, kind transport.MutationKind, payload any) (uint64, error) { + if !r.master.HasQuorum() { + return 0, errors.New("no quorum: refusing mutation") + } + + raw, err := json.Marshal(payload) + if err != nil { + return 0, fmt.Errorf("marshal payload: %w", err) + } + + if r.master.IsMaster() { + // apply directly, then broadcast. + newVer, err := r.applyLocally(kind, raw) + if err != nil { + return 0, err + } + r.broadcast(ctx) + return newVer, nil + } + + // follower: ship to master. + masterID := r.master.Master() + if masterID == "" { + return 0, errors.New("master unknown") + } + addr := r.addressOf(masterID) + if addr == "" { + return 0, fmt.Errorf("master %s has no advertise address", masterID) + } + req := transport.ProposeMutationRequest{ + FromNodeID: r.selfID, + Kind: kind, + Payload: raw, + } + var resp transport.ProposeMutationResponse + if err := r.client.Call(ctx, masterID, addr, transport.MethodProposeMutation, req, &resp); err != nil { + return 0, fmt.Errorf("propose to master: %w", err) + } + if resp.Error != "" { + return 0, errors.New(resp.Error) + } + return resp.NewVersion, nil +} + +// HandleProposeMutation is the inbound RPC handler. Only meaningful +// when this node is the master; followers reject the proposal so the +// caller knows to re-route. +func (r *Replicator) HandleProposeMutation(ctx context.Context, req transport.ProposeMutationRequest) transport.ProposeMutationResponse { + if !r.master.IsMaster() { + return transport.ProposeMutationResponse{Error: "not master"} + } + if !r.master.HasQuorum() { + return transport.ProposeMutationResponse{Error: "no quorum"} + } + newVer, err := r.applyLocally(req.Kind, req.Payload) + if err != nil { + return transport.ProposeMutationResponse{Error: err.Error()} + } + go r.broadcast(context.Background()) + return transport.ProposeMutationResponse{NewVersion: newVer} +} + +// HandleApplyClusterCfg is the inbound RPC handler for a master +// broadcast. Applies if the version is strictly newer than ours. +func (r *Replicator) HandleApplyClusterCfg(req transport.ApplyClusterCfgRequest) transport.ApplyClusterCfgResponse { + applied := false + if req.Config != nil { + ok, err := r.cluster.Replace(req.Config) + if err == nil && ok { + applied = true + } + } + return transport.ApplyClusterCfgResponse{ + Applied: applied, + Version: r.cluster.Snapshot().Version, + } +} + +// HandleGetClusterCfg returns a snapshot of our cluster.yaml. Used by +// followers performing catch-up against the master. +func (r *Replicator) HandleGetClusterCfg() transport.GetClusterCfgResponse { + return transport.GetClusterCfgResponse{Config: r.cluster.Snapshot()} +} + +// PullFrom fetches the cluster config from a peer and applies it. +// Wired to the quorum manager's VersionObserver. +func (r *Replicator) PullFrom(ctx context.Context, peerID, addr string) error { + if peerID == "" || addr == "" { + return errors.New("pull: empty peer") + } + var resp transport.GetClusterCfgResponse + if err := r.client.Call(ctx, peerID, addr, transport.MethodGetClusterCfg, transport.GetClusterCfgRequest{}, &resp); err != nil { + return fmt.Errorf("pull from %s: %w", peerID, err) + } + if resp.Config == nil { + return errors.New("pull: empty config") + } + _, err := r.cluster.Replace(resp.Config) + return err +} + +// broadcast pushes the current cluster config to all peers. Called by +// master after a successful mutation. +func (r *Replicator) broadcast(ctx context.Context) { + snap := r.cluster.Snapshot() + for _, p := range snap.Peers { + if p.NodeID == r.selfID || p.NodeID == "" || p.Advertise == "" { + continue + } + peerID, addr := p.NodeID, p.Advertise + go func(peerID, addr string) { + callCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + req := transport.ApplyClusterCfgRequest{Config: snap} + var resp transport.ApplyClusterCfgResponse + _ = r.client.Call(callCtx, peerID, addr, transport.MethodApplyClusterCfg, req, &resp) + }(peerID, addr) + } +} + +func (r *Replicator) addressOf(nodeID string) string { + for _, p := range r.cluster.Snapshot().Peers { + if p.NodeID == nodeID { + return p.Advertise + } + } + return "" +} + +// applyLocally decodes the mutation payload and applies it. Only the +// master should call this directly. Returns the new version. +func (r *Replicator) applyLocally(kind transport.MutationKind, payload json.RawMessage) (uint64, error) { + apply := func(c *config.ClusterConfig) error { + switch kind { + case transport.MutationAddCheck: + var ch config.Check + if err := json.Unmarshal(payload, &ch); err != nil { + return fmt.Errorf("decode check: %w", err) + } + if ch.ID == "" || ch.Name == "" { + return errors.New("check needs id and name") + } + for i, existing := range c.Checks { + if existing.ID == ch.ID || existing.Name == ch.Name { + c.Checks[i] = ch + return nil + } + } + c.Checks = append(c.Checks, ch) + return nil + + case transport.MutationRemoveCheck: + var target string + if err := json.Unmarshal(payload, &target); err != nil { + return fmt.Errorf("decode target: %w", err) + } + for i, existing := range c.Checks { + if existing.ID == target || existing.Name == target { + c.Checks = append(c.Checks[:i], c.Checks[i+1:]...) + return nil + } + } + return fmt.Errorf("no such check %q", target) + + case transport.MutationAddAlert: + var a config.Alert + if err := json.Unmarshal(payload, &a); err != nil { + return fmt.Errorf("decode alert: %w", err) + } + if a.ID == "" || a.Name == "" { + return errors.New("alert needs id and name") + } + for i, existing := range c.Alerts { + if existing.ID == a.ID || existing.Name == a.Name { + c.Alerts[i] = a + return nil + } + } + c.Alerts = append(c.Alerts, a) + return nil + + case transport.MutationRemoveAlert: + var target string + if err := json.Unmarshal(payload, &target); err != nil { + return fmt.Errorf("decode target: %w", err) + } + for i, existing := range c.Alerts { + if existing.ID == target || existing.Name == target { + c.Alerts = append(c.Alerts[:i], c.Alerts[i+1:]...) + return nil + } + } + return fmt.Errorf("no such alert %q", target) + + case transport.MutationAddPeer: + var p config.PeerInfo + if err := json.Unmarshal(payload, &p); err != nil { + return fmt.Errorf("decode peer: %w", err) + } + if p.NodeID == "" { + return errors.New("peer needs node_id") + } + for i, existing := range c.Peers { + if existing.NodeID == p.NodeID { + c.Peers[i] = p + return nil + } + } + c.Peers = append(c.Peers, p) + return nil + + case transport.MutationRemovePeer: + var target string + if err := json.Unmarshal(payload, &target); err != nil { + return fmt.Errorf("decode target: %w", err) + } + for i, existing := range c.Peers { + if existing.NodeID == target { + c.Peers = append(c.Peers[:i], c.Peers[i+1:]...) + return nil + } + } + return fmt.Errorf("no such peer %q", target) + + default: + return fmt.Errorf("unknown mutation kind %q", kind) + } + } + + if err := r.cluster.Mutate(r.selfID, apply); err != nil { + return 0, err + } + return r.cluster.Snapshot().Version, nil +} diff --git a/internal/transport/frame.go b/internal/transport/frame.go new file mode 100644 index 0000000..3b60111 --- /dev/null +++ b/internal/transport/frame.go @@ -0,0 +1,45 @@ +package transport + +import ( + "encoding/binary" + "errors" + "io" +) + +// MaxFrameSize caps the size of an individual on-wire message. 16 MiB +// is comfortably above any plausible cluster.yaml or status payload +// and rejects malicious giants up front. +const MaxFrameSize = 16 * 1024 * 1024 + +// writeFrame emits a single length-prefixed message: 4-byte big-endian +// length followed by the body. +func writeFrame(w io.Writer, body []byte) error { + if len(body) > MaxFrameSize { + return errors.New("frame too large") + } + var hdr [4]byte + binary.BigEndian.PutUint32(hdr[:], uint32(len(body))) + if _, err := w.Write(hdr[:]); err != nil { + return err + } + _, err := w.Write(body) + return err +} + +// readFrame reads the next length-prefixed message. Returns io.EOF +// cleanly when the connection closes on a frame boundary. +func readFrame(r io.Reader) ([]byte, error) { + var hdr [4]byte + if _, err := io.ReadFull(r, hdr[:]); err != nil { + return nil, err + } + n := binary.BigEndian.Uint32(hdr[:]) + if n > MaxFrameSize { + return nil, errors.New("incoming frame exceeds MaxFrameSize") + } + buf := make([]byte, n) + if _, err := io.ReadFull(r, buf); err != nil { + return nil, err + } + return buf, nil +} diff --git a/internal/transport/messages.go b/internal/transport/messages.go new file mode 100644 index 0000000..9880449 --- /dev/null +++ b/internal/transport/messages.go @@ -0,0 +1,193 @@ +// Package transport carries inter-node RPC over mTLS. It owns three +// concerns and nothing else: +// +// 1. Building tls.Config values that pin peer certs against the local +// trust store (server and client side). +// 2. Length-prefixed JSON framing on top of the TLS connection. +// 3. A tiny method-dispatch RPC: callers register handlers by method +// name; remote peers invoke them via Client.Call. +// +// Higher-level concerns (heartbeats, quorum, replication, check +// shipping) live in their own packages and use this one purely as a +// pipe. That keeps the wire format easy to reason about and the +// surrounding packages testable without a real network. +package transport + +import ( + "encoding/json" + "time" + + "github.com/jasper/quptime/internal/config" +) + +// Method names. Defined here so every package agrees on the wire-level +// identifier without importing each other. +const ( + MethodPing = "Ping" + MethodWhoAmI = "WhoAmI" + MethodJoin = "Join" + MethodHeartbeat = "Heartbeat" + MethodGetClusterCfg = "GetClusterCfg" + MethodApplyClusterCfg = "ApplyClusterCfg" + MethodProposeMutation = "ProposeMutation" + MethodReportResult = "ReportResult" + MethodStatus = "Status" +) + +// PingRequest is an empty liveness probe. PingResponse carries the +// responder's wall clock so the caller can sanity-check drift. +type PingRequest struct{} + +// PingResponse is returned by MethodPing. +type PingResponse struct { + NodeID string `json:"node_id"` + Now time.Time `json:"now"` +} + +// WhoAmIRequest asks the remote node to identify itself. Used during +// the TOFU handshake before the caller commits a trust entry. +type WhoAmIRequest struct{} + +// WhoAmIResponse carries the node's identity. The fingerprint is +// recomputed by the caller from the TLS cert and compared against the +// claim here as a defense-in-depth check. +type WhoAmIResponse struct { + NodeID string `json:"node_id"` + Advertise string `json:"advertise"` + Fingerprint string `json:"fingerprint"` + CertPEM string `json:"cert_pem"` +} + +// JoinRequest is sent by a node that has just learned the remote's +// fingerprint out of band and wants the remote to record this node in +// its own trust store too (so the relationship is symmetric). +type JoinRequest struct { + NodeID string `json:"node_id"` + Advertise string `json:"advertise"` + Fingerprint string `json:"fingerprint"` + CertPEM string `json:"cert_pem"` +} + +// JoinResponse echoes a non-empty Error string when the remote refuses +// the join (e.g. operator declined the prompt or fingerprint mismatch). +type JoinResponse struct { + Accepted bool `json:"accepted"` + Error string `json:"error,omitempty"` +} + +// HeartbeatRequest is the periodic liveness ping sent over the +// inter-node channel. It also carries the sender's view of who the +// master is, so disagreements surface quickly. +type HeartbeatRequest struct { + FromNodeID string `json:"from_node_id"` + Term uint64 `json:"term"` + MasterID string `json:"master_id"` + Version uint64 `json:"config_version"` +} + +// HeartbeatResponse is returned by MethodHeartbeat. +type HeartbeatResponse struct { + NodeID string `json:"node_id"` + Term uint64 `json:"term"` + MasterID string `json:"master_id"` + Version uint64 `json:"config_version"` +} + +// GetClusterCfgRequest fetches the responder's view of cluster.yaml. +// Used by stale followers to pull the canonical config from master. +type GetClusterCfgRequest struct{} + +// GetClusterCfgResponse contains a cluster.yaml snapshot. +type GetClusterCfgResponse struct { + Config *config.ClusterConfig `json:"config"` +} + +// ApplyClusterCfgRequest is the master pushing a new replicated config +// to a follower. The follower applies only if Version is strictly +// greater than its local Version. +type ApplyClusterCfgRequest struct { + Config *config.ClusterConfig `json:"config"` +} + +// ApplyClusterCfgResponse acknowledges with whether the follower +// stored the new config. +type ApplyClusterCfgResponse struct { + Applied bool `json:"applied"` + Version uint64 `json:"current_version"` +} + +// MutationKind enumerates the cluster-config edit operations that +// followers forward to the master. +type MutationKind string + +const ( + MutationAddCheck MutationKind = "add_check" + MutationRemoveCheck MutationKind = "remove_check" + MutationAddAlert MutationKind = "add_alert" + MutationRemoveAlert MutationKind = "remove_alert" + MutationAddPeer MutationKind = "add_peer" + MutationRemovePeer MutationKind = "remove_peer" +) + +// ProposeMutationRequest is a follower-to-master message. The payload +// is the JSON-encoded body of the new entity (a Check, an Alert, or a +// PeerInfo) for the "add" variants, or the target ID/NodeID string for +// removals. +type ProposeMutationRequest struct { + FromNodeID string `json:"from_node_id"` + Kind MutationKind `json:"kind"` + Payload json.RawMessage `json:"payload"` +} + +// ProposeMutationResponse is the master's reply to ProposeMutation. +type ProposeMutationResponse struct { + NewVersion uint64 `json:"new_version"` + Error string `json:"error,omitempty"` +} + +// ReportResultRequest is a follower-to-master message reporting the +// outcome of a single local probe. +type ReportResultRequest struct { + FromNodeID string `json:"from_node_id"` + CheckID string `json:"check_id"` + OK bool `json:"ok"` + Detail string `json:"detail,omitempty"` + LatencyMS int64 `json:"latency_ms"` + At time.Time `json:"at"` +} + +// ReportResultResponse acknowledges a result. Empty body for now. +type ReportResultResponse struct{} + +// StatusRequest asks a peer for its operational state. +type StatusRequest struct{} + +// StatusResponse is what `qu status` aggregates and displays. +type StatusResponse struct { + NodeID string `json:"node_id"` + Term uint64 `json:"term"` + MasterID string `json:"master_id"` + Version uint64 `json:"config_version"` + Peers []PeerLiveness `json:"peers"` + Checks []CheckSnapshot `json:"checks"` + HasQuorum bool `json:"has_quorum"` + QuorumSize int `json:"quorum_size"` +} + +// PeerLiveness summarises one peer for status output. +type PeerLiveness struct { + NodeID string `json:"node_id"` + Advertise string `json:"advertise"` + Live bool `json:"live"` + LastSeen time.Time `json:"last_seen"` +} + +// CheckSnapshot is the aggregate state of one configured check. +type CheckSnapshot struct { + CheckID string `json:"check_id"` + Name string `json:"name"` + State string `json:"state"` // "up", "down", "unknown" + OKCount int `json:"ok_count"` + Total int `json:"total"` + Detail string `json:"detail,omitempty"` +} diff --git a/internal/transport/rpc.go b/internal/transport/rpc.go new file mode 100644 index 0000000..2170195 --- /dev/null +++ b/internal/transport/rpc.go @@ -0,0 +1,329 @@ +package transport + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "sync" + "sync/atomic" + "time" +) + +// HandlerFunc is registered by callers for a specific method name. The +// raw JSON request body and the peer's verified node ID are provided. +// The returned value (if any) is JSON-marshalled into the response. +type HandlerFunc func(ctx context.Context, peerNodeID string, payload json.RawMessage) (any, error) + +// Server is a registry of method handlers plus an accept loop. It +// owns no business logic; callers register methods and Serve dispatches. +type Server struct { + assets *TLSAssets + handlers map[string]HandlerFunc + + mu sync.Mutex + ln net.Listener + conns map[net.Conn]struct{} +} + +// NewServer constructs a Server with no handlers registered. +func NewServer(assets *TLSAssets) *Server { + return &Server{ + assets: assets, + handlers: map[string]HandlerFunc{}, + conns: map[net.Conn]struct{}{}, + } +} + +// Handle registers fn for the given method name. Replaces any prior +// handler for the same method. +func (s *Server) Handle(method string, fn HandlerFunc) { + s.handlers[method] = fn +} + +// Serve binds the listener at addr and dispatches incoming RPCs until +// Stop is called or the listener errors out. +func (s *Server) Serve(ctx context.Context, addr string) error { + tlsCfg, err := s.assets.ServerConfig() + if err != nil { + return err + } + ln, err := tls.Listen("tcp", addr, tlsCfg) + if err != nil { + return fmt.Errorf("listen %s: %w", addr, err) + } + s.mu.Lock() + s.ln = ln + s.mu.Unlock() + + go func() { + <-ctx.Done() + _ = ln.Close() + }() + + for { + conn, err := ln.Accept() + if err != nil { + if errors.Is(err, net.ErrClosed) { + return nil + } + return err + } + go s.handleConn(ctx, conn) + } +} + +// Stop closes the listener and all in-flight connections. Safe to call +// from any goroutine. +func (s *Server) Stop() { + s.mu.Lock() + if s.ln != nil { + _ = s.ln.Close() + } + for c := range s.conns { + _ = c.Close() + } + s.conns = map[net.Conn]struct{}{} + s.mu.Unlock() +} + +func (s *Server) trackConn(c net.Conn) { s.mu.Lock(); s.conns[c] = struct{}{}; s.mu.Unlock() } +func (s *Server) untrackConn(c net.Conn) { s.mu.Lock(); delete(s.conns, c); s.mu.Unlock() } + +func (s *Server) handleConn(ctx context.Context, raw net.Conn) { + s.trackConn(raw) + defer func() { + s.untrackConn(raw) + _ = raw.Close() + }() + + tlsConn, ok := raw.(*tls.Conn) + if !ok { + return + } + if err := tlsConn.HandshakeContext(ctx); err != nil { + return + } + peerID := peerNodeIDFromConnState(tlsConn.ConnectionState()) + + for { + body, err := readFrame(tlsConn) + if err != nil { + return + } + var req requestEnvelope + if err := json.Unmarshal(body, &req); err != nil { + _ = writeError(tlsConn, 0, "decode request: "+err.Error()) + return + } + + fn, exists := s.handlers[req.Method] + if !exists { + _ = writeError(tlsConn, req.ID, "unknown method: "+req.Method) + continue + } + + result, err := fn(ctx, peerID, req.Params) + if err != nil { + _ = writeError(tlsConn, req.ID, err.Error()) + continue + } + if err := writeResult(tlsConn, req.ID, result); err != nil { + return + } + } +} + +// Client opens and pools one mTLS connection per peer node ID. Each +// connection serialises outstanding calls under a mutex; concurrent +// calls to different peers proceed in parallel. +type Client struct { + assets *TLSAssets + + mu sync.Mutex + conns map[string]*clientConn // by peer node ID + + nextID atomic.Uint64 +} + +// NewClient constructs an empty connection pool. +func NewClient(assets *TLSAssets) *Client { + return &Client{assets: assets, conns: map[string]*clientConn{}} +} + +type clientConn struct { + mu sync.Mutex + conn *tls.Conn +} + +// Close drops every pooled connection. Safe to call multiple times. +func (c *Client) Close() { + c.mu.Lock() + defer c.mu.Unlock() + for id, cc := range c.conns { + if cc.conn != nil { + _ = cc.conn.Close() + } + delete(c.conns, id) + } +} + +// Call invokes method on the peer at addr (identified by nodeID for +// fingerprint pinning), marshalling params to JSON and unmarshalling +// the result into out. out may be nil if the caller doesn't care. +func (c *Client) Call(ctx context.Context, nodeID, addr, method string, params any, out any) error { + cc, err := c.getConn(ctx, nodeID, addr) + if err != nil { + return err + } + if err := c.callOn(ctx, cc, method, params, out); err != nil { + // drop the connection on error so the next call reconnects fresh + c.dropConn(nodeID) + return err + } + return nil +} + +func (c *Client) callOn(ctx context.Context, cc *clientConn, method string, params any, out any) error { + paramsJSON, err := json.Marshal(params) + if err != nil { + return fmt.Errorf("marshal params: %w", err) + } + id := c.nextID.Add(1) + env := requestEnvelope{ID: id, Method: method, Params: paramsJSON} + body, err := json.Marshal(env) + if err != nil { + return err + } + + cc.mu.Lock() + defer cc.mu.Unlock() + + if dl, ok := ctx.Deadline(); ok { + _ = cc.conn.SetDeadline(dl) + defer func() { _ = cc.conn.SetDeadline(time.Time{}) }() + } + + if err := writeFrame(cc.conn, body); err != nil { + return err + } + respBody, err := readFrame(cc.conn) + if err != nil { + return err + } + var resp responseEnvelope + if err := json.Unmarshal(respBody, &resp); err != nil { + return fmt.Errorf("decode response: %w", err) + } + if resp.Error != "" { + return fmt.Errorf("remote: %s", resp.Error) + } + if out != nil && len(resp.Result) > 0 { + if err := json.Unmarshal(resp.Result, out); err != nil { + return fmt.Errorf("decode result: %w", err) + } + } + return nil +} + +func (c *Client) getConn(ctx context.Context, nodeID, addr string) (*clientConn, error) { + c.mu.Lock() + cc, ok := c.conns[nodeID] + c.mu.Unlock() + if ok && cc.conn != nil { + return cc, nil + } + + tlsCfg, err := c.assets.ClientConfig(nodeID) + if err != nil { + return nil, err + } + d := tls.Dialer{Config: tlsCfg} + raw, err := d.DialContext(ctx, "tcp", addr) + if err != nil { + return nil, fmt.Errorf("dial %s: %w", addr, err) + } + tc, ok := raw.(*tls.Conn) + if !ok { + _ = raw.Close() + return nil, errors.New("dial returned non-tls conn") + } + cc = &clientConn{conn: tc} + c.mu.Lock() + if existing, ok := c.conns[nodeID]; ok && existing.conn != nil { + // concurrent dial — drop ours, reuse existing + _ = tc.Close() + c.mu.Unlock() + return existing, nil + } + c.conns[nodeID] = cc + c.mu.Unlock() + return cc, nil +} + +func (c *Client) dropConn(nodeID string) { + c.mu.Lock() + if cc, ok := c.conns[nodeID]; ok { + if cc.conn != nil { + _ = cc.conn.Close() + } + delete(c.conns, nodeID) + } + c.mu.Unlock() +} + +// requestEnvelope is the wire shape of an RPC request frame. +type requestEnvelope struct { + ID uint64 `json:"id"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` +} + +// responseEnvelope is the wire shape of an RPC response frame. +type responseEnvelope struct { + ID uint64 `json:"id"` + Result json.RawMessage `json:"result,omitempty"` + Error string `json:"error,omitempty"` +} + +func writeResult(w io.Writer, id uint64, result any) error { + var raw json.RawMessage + if result != nil { + b, err := json.Marshal(result) + if err != nil { + return writeError(w, id, "marshal result: "+err.Error()) + } + raw = b + } + body, err := json.Marshal(responseEnvelope{ID: id, Result: raw}) + if err != nil { + return err + } + return writeFrame(w, body) +} + +func writeError(w io.Writer, id uint64, msg string) error { + body, err := json.Marshal(responseEnvelope{ID: id, Error: msg}) + if err != nil { + return err + } + return writeFrame(w, body) +} + +// peerNodeIDFromConnState extracts the peer's NodeID from the cert's +// CommonName field. The init flow sets CN to the local NodeID. +func peerNodeIDFromConnState(cs tls.ConnectionState) string { + if len(cs.PeerCertificates) == 0 { + return "" + } + return cs.PeerCertificates[0].Subject.CommonName +} + +// fingerprintOf is a small local mirror to keep this file independent +// of the crypto package's import path at link time; we recompute the +// SPKI hash here. Defined in tofu.go. +var _ = (*x509.Certificate)(nil) diff --git a/internal/transport/tls.go b/internal/transport/tls.go new file mode 100644 index 0000000..d34d664 --- /dev/null +++ b/internal/transport/tls.go @@ -0,0 +1,120 @@ +package transport + +import ( + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "encoding/pem" + "errors" + "fmt" + + "github.com/jasper/quptime/internal/trust" +) + +// MinTLS is the minimum protocol version both sides require. +const MinTLS = tls.VersionTLS13 + +// TLSAssets bundles the on-disk material needed to spin up either a +// listener or a dialer. Build it once at daemon start and pass to +// ServerConfig / ClientConfig. +type TLSAssets struct { + Cert []byte // PEM-encoded leaf cert + Key *rsa.PrivateKey + Trust *trust.Store +} + +// tlsCert wraps the local PEM cert + RSA key into a tls.Certificate. +func (a *TLSAssets) tlsCert() (tls.Certificate, error) { + block, _ := pem.Decode(a.Cert) + if block == nil { + return tls.Certificate{}, errors.New("cert PEM has no block") + } + leaf, err := x509.ParseCertificate(block.Bytes) + if err != nil { + return tls.Certificate{}, fmt.Errorf("parse leaf: %w", err) + } + return tls.Certificate{ + Certificate: [][]byte{block.Bytes}, + PrivateKey: a.Key, + Leaf: leaf, + }, nil +} + +// ServerConfig produces a tls.Config suitable for an inter-node +// listener. Peers must present a certificate, and that certificate's +// fingerprint must already be present in the trust store. +func (a *TLSAssets) ServerConfig() (*tls.Config, error) { + cert, err := a.tlsCert() + if err != nil { + return nil, err + } + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + MinVersion: MinTLS, + ClientAuth: tls.RequireAnyClientCert, + InsecureSkipVerify: true, // we do our own pinning via VerifyPeerCertificate + VerifyPeerCertificate: a.Trust.VerifyPeerCert, + }, nil +} + +// ClientConfig produces a tls.Config suitable for dialing a peer. +// expectedNodeID is optional: if non-empty, the handshake also +// verifies that the cert's fingerprint matches the trust entry for +// that node ID. +func (a *TLSAssets) ClientConfig(expectedNodeID string) (*tls.Config, error) { + cert, err := a.tlsCert() + if err != nil { + return nil, err + } + verify := a.Trust.VerifyPeerCert + if expectedNodeID != "" { + verify = a.makeStrictVerifier(expectedNodeID) + } + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + MinVersion: MinTLS, + InsecureSkipVerify: true, // we do our own pinning via VerifyPeerCertificate + VerifyPeerCertificate: verify, + }, nil +} + +// InsecureBootstrapConfig is the client-side TLS config used only by +// the TOFU prefetch (FetchPeerCert). It accepts any peer cert because +// the caller has not yet established trust; the certificate is +// surfaced to the operator for manual approval before being added to +// the store. Never use this anywhere else. +func (a *TLSAssets) InsecureBootstrapConfig() (*tls.Config, error) { + cert, err := a.tlsCert() + if err != nil { + return nil, err + } + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + MinVersion: MinTLS, + InsecureSkipVerify: true, + }, nil +} + +// makeStrictVerifier returns a VerifyPeerCertificate callback that +// pins the connection to the trust entry of a specific node ID. +func (a *TLSAssets) makeStrictVerifier(expectedNodeID string) func([][]byte, [][]*x509.Certificate) error { + return func(rawCerts [][]byte, _ [][]*x509.Certificate) error { + if len(rawCerts) == 0 { + return errors.New("peer presented no certificate") + } + cert, err := x509.ParseCertificate(rawCerts[0]) + if err != nil { + return fmt.Errorf("parse peer cert: %w", err) + } + entry, ok := a.Trust.Get(expectedNodeID) + if !ok { + return fmt.Errorf("no trust entry for node %s", expectedNodeID) + } + got := fingerprintOf(cert) + if got != entry.Fingerprint { + return fmt.Errorf("fingerprint mismatch for %s: got %s want %s", + expectedNodeID, got, entry.Fingerprint) + } + return nil + } +} diff --git a/internal/transport/tofu.go b/internal/transport/tofu.go new file mode 100644 index 0000000..82ada77 --- /dev/null +++ b/internal/transport/tofu.go @@ -0,0 +1,71 @@ +package transport + +import ( + "context" + "crypto/sha256" + "crypto/tls" + "crypto/x509" + "encoding/hex" + "encoding/pem" + "errors" + "fmt" + "net" + "time" +) + +// fingerprintOf computes the SHA-256 SPKI fingerprint of a parsed +// certificate using the same encoding as the crypto package +// (sha256:hex). Duplicated here to keep the transport package +// dependency-light at the call site. +func fingerprintOf(cert *x509.Certificate) string { + sum := sha256.Sum256(cert.RawSubjectPublicKeyInfo) + return "sha256:" + hex.EncodeToString(sum[:]) +} + +// PeerCertSample is the result of a TOFU probe: the operator inspects +// the fingerprint and decides whether to trust it. +type PeerCertSample struct { + Cert *x509.Certificate + CertPEM []byte + Fingerprint string +} + +// FetchPeerCert opens an mTLS connection to addr with no trust +// pinning, captures the peer's certificate, and closes the connection. +// The caller must show the fingerprint to the operator before adding +// it to the trust store. +// +// This is the *only* place the trust store is bypassed. After the +// TOFU exchange, the regular ClientConfig path applies for all future +// traffic to that peer. +func FetchPeerCert(ctx context.Context, assets *TLSAssets, addr string) (*PeerCertSample, error) { + cfg, err := assets.InsecureBootstrapConfig() + if err != nil { + return nil, err + } + dialCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + d := tls.Dialer{Config: cfg, NetDialer: &net.Dialer{}} + raw, err := d.DialContext(dialCtx, "tcp", addr) + if err != nil { + return nil, fmt.Errorf("dial %s: %w", addr, err) + } + defer raw.Close() + + tc, ok := raw.(*tls.Conn) + if !ok { + return nil, errors.New("dial returned non-tls conn") + } + state := tc.ConnectionState() + if len(state.PeerCertificates) == 0 { + return nil, errors.New("peer presented no certificate") + } + leaf := state.PeerCertificates[0] + pemBytes := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: leaf.Raw}) + return &PeerCertSample{ + Cert: leaf, + CertPEM: pemBytes, + Fingerprint: fingerprintOf(leaf), + }, nil +} diff --git a/internal/trust/store.go b/internal/trust/store.go new file mode 100644 index 0000000..250f570 --- /dev/null +++ b/internal/trust/store.go @@ -0,0 +1,159 @@ +// Package trust is the local fingerprint trust store. Every inbound +// or outbound mTLS connection must present a peer cert whose +// fingerprint is recorded here, otherwise it is refused. +// +// The trust store is NOT replicated automatically by the cluster +// config: each node's operator confirms each new peer's fingerprint +// out of band (the "trust on first use" prompt). After confirmation, +// the cluster replication layer is free to propagate the peer's +// public material to other nodes — but it cannot grant trust on +// behalf of a node that has not yet trusted it. +package trust + +import ( + "crypto/x509" + "errors" + "fmt" + "os" + "sync" + "time" + + "gopkg.in/yaml.v3" + + "github.com/jasper/quptime/internal/config" + "github.com/jasper/quptime/internal/crypto" +) + +// Entry is one trusted peer. +type Entry struct { + NodeID string `yaml:"node_id"` + Address string `yaml:"address"` + Fingerprint string `yaml:"fingerprint"` + PublicKeyPEM string `yaml:"public_key_pem"` + CertPEM string `yaml:"cert_pem,omitempty"` + AddedAt time.Time `yaml:"added_at"` +} + +// Store is the persistent trust list with an in-memory cache. +type Store struct { + mu sync.RWMutex + entries map[string]Entry // keyed by NodeID +} + +// Load reads trust.yaml. A missing file yields an empty store. +func Load() (*Store, error) { + s := &Store{entries: map[string]Entry{}} + raw, err := os.ReadFile(config.TrustFilePath()) + if err != nil { + if os.IsNotExist(err) { + return s, nil + } + return nil, err + } + wrap := struct { + Entries []Entry `yaml:"entries"` + }{} + if err := yaml.Unmarshal(raw, &wrap); err != nil { + return nil, fmt.Errorf("parse trust.yaml: %w", err) + } + for _, e := range wrap.Entries { + s.entries[e.NodeID] = e + } + return s, nil +} + +// Save writes trust.yaml atomically. +func (s *Store) Save() error { + s.mu.RLock() + defer s.mu.RUnlock() + list := make([]Entry, 0, len(s.entries)) + for _, e := range s.entries { + list = append(list, e) + } + out, err := yaml.Marshal(struct { + Entries []Entry `yaml:"entries"` + }{Entries: list}) + if err != nil { + return err + } + return config.AtomicWrite(config.TrustFilePath(), out, 0o600) +} + +// Add inserts or replaces a trust entry by NodeID. +func (s *Store) Add(e Entry) error { + if e.NodeID == "" || e.Fingerprint == "" { + return errors.New("trust entry needs node_id and fingerprint") + } + if e.AddedAt.IsZero() { + e.AddedAt = time.Now().UTC() + } + s.mu.Lock() + s.entries[e.NodeID] = e + s.mu.Unlock() + return s.Save() +} + +// Remove drops the entry for nodeID. Returns true if anything was +// actually removed. +func (s *Store) Remove(nodeID string) (bool, error) { + s.mu.Lock() + _, ok := s.entries[nodeID] + if ok { + delete(s.entries, nodeID) + } + s.mu.Unlock() + if !ok { + return false, nil + } + return true, s.Save() +} + +// List returns a copy of all entries. +func (s *Store) List() []Entry { + s.mu.RLock() + defer s.mu.RUnlock() + out := make([]Entry, 0, len(s.entries)) + for _, e := range s.entries { + out = append(out, e) + } + return out +} + +// Get returns the entry for the given NodeID and whether it was found. +func (s *Store) Get(nodeID string) (Entry, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + e, ok := s.entries[nodeID] + return e, ok +} + +// LookupByFingerprint finds the entry matching the given fingerprint +// regardless of NodeID. Useful when validating incoming TLS handshakes +// where the cert's CommonName carries the NodeID claim. +func (s *Store) LookupByFingerprint(fp string) (Entry, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + for _, e := range s.entries { + if e.Fingerprint == fp { + return e, true + } + } + return Entry{}, false +} + +// VerifyPeerCert is the tls.Config VerifyPeerCertificate callback. It +// rejects any cert whose fingerprint isn't in the trust store. +func (s *Store) VerifyPeerCert(rawCerts [][]byte, _ [][]*x509.Certificate) error { + if len(rawCerts) == 0 { + return errors.New("peer presented no certificate") + } + cert, err := x509.ParseCertificate(rawCerts[0]) + if err != nil { + return fmt.Errorf("parse peer cert: %w", err) + } + fp := crypto.Fingerprint(cert) + if _, ok := s.LookupByFingerprint(fp); !ok { + return fmt.Errorf("peer cert fingerprint %s not in trust store", fp) + } + return nil +}