From 139c224a31bcf468a420dce3c78ddc1869ee079c Mon Sep 17 00:00:00 2001 From: Axodouble Date: Tue, 12 May 2026 06:20:51 +0000 Subject: [PATCH] Added tests and readme --- README.md | 201 ++++++++++++++++++++++++++ internal/alerts/dispatcher.go | 4 +- internal/alerts/message_test.go | 52 +++++++ internal/checks/aggregator.go | 11 -- internal/checks/aggregator_test.go | 111 ++++++++++++++ internal/checks/probe_test.go | 118 +++++++++++++++ internal/cli/alert.go | 23 +-- internal/cli/check.go | 5 - internal/config/cluster.go | 35 +---- internal/config/cluster_test.go | 107 ++++++++++++++ internal/config/node_test.go | 58 ++++++++ internal/config/paths.go | 5 - internal/crypto/cert.go | 10 -- internal/crypto/crypto_test.go | 111 ++++++++++++++ internal/crypto/keys.go | 6 - internal/daemon/control.go | 3 +- internal/quorum/manager_test.go | 146 +++++++++++++++++++ internal/replicate/replicator.go | 11 +- internal/replicate/replicator_test.go | 164 +++++++++++++++++++++ internal/transport/frame_test.go | 76 ++++++++++ internal/transport/rpc.go | 6 - internal/transport/rpc_test.go | 186 ++++++++++++++++++++++++ internal/trust/store_test.go | 97 +++++++++++++ 23 files changed, 1449 insertions(+), 97 deletions(-) create mode 100644 README.md create mode 100644 internal/alerts/message_test.go create mode 100644 internal/checks/aggregator_test.go create mode 100644 internal/checks/probe_test.go create mode 100644 internal/config/cluster_test.go create mode 100644 internal/config/node_test.go create mode 100644 internal/crypto/crypto_test.go create mode 100644 internal/quorum/manager_test.go create mode 100644 internal/replicate/replicator_test.go create mode 100644 internal/transport/frame_test.go create mode 100644 internal/transport/rpc_test.go create mode 100644 internal/trust/store_test.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..7f1610a --- /dev/null +++ b/README.md @@ -0,0 +1,201 @@ +# qu — quorum-based uptime monitor + +`qu` is a small Linux daemon that watches HTTP, TCP, and ICMP endpoints +from several cooperating nodes. The nodes form a quorum cluster; one is +elected master and owns alert dispatch. A check is only reported as +**DOWN** when the majority of nodes agree, which keeps a single node's +flaky uplink from paging anyone at 3am. + +A single static binary contains the daemon, the CLI, and everything in +between. Inter-node traffic is mutual TLS with SSH-style fingerprint +trust — no central CA, no shared secret. + +## Why + +Most uptime monitors are either a SaaS or a single box that, by +definition, can't tell you when it's the one that's down. `qu` solves +both: run it on a few cheap hosts in different networks and they vote +on truth. If one of them loses its uplink, the rest keep alerting. + +## Architecture + +``` + +-------------- node A ---------------+ + | qu serve | + | ├─ transport server (mTLS :9001) | + | ├─ quorum manager (heartbeats) | + | ├─ replicator (cluster.yaml) | + | ├─ scheduler (HTTP/TCP/ICMP) | <─── probes targets + | ├─ aggregator (master-only) | + | ├─ alerts (master-only) | + | └─ control socket (unix, for CLI) | + +-------------------------------------+ + │ ▲ mTLS, pinned by fingerprint + ▼ │ + node B node C … +``` + +Every node runs every probe. Results are shipped to the elected master, +which folds them into a per-check sliding window. A state flips (UP↔DOWN) +only after **two consecutive aggregate evaluations** agree — that's +the hysteresis that absorbs network blips. + +Master election is deterministic: among the live members of the quorum, +the node with the lexicographically smallest NodeID wins. No +negotiation, no split-brain window. + +## Build + +Requires Go 1.23 or newer. + +```sh +go build -o qu ./cmd/qu +``` + +## Set up a 3-node cluster + +On each host: + +```sh +# 1. Generate identity + RSA-3072 keypair + self-signed cert. +qu init --advertise :9001 + +# 2. Start the daemon (foreground; wire it into systemd for prod). +qu serve +``` + +Pick one node and tell it about the other two. The CLI prints the +remote fingerprint and asks for confirmation, SSH-style: + +```sh +qu node add bravo.example.com:9001 +qu node add charlie.example.com:9001 +``` + +That's it — the master broadcasts the new cluster config to every +trusting peer. `qu status` from any node should now show all three: + +``` +node a7f3... +term 2 +master a7f3... +quorum true (need 2) +config ver 4 + +PEERS +NODE_ID ADVERTISE LIVE LAST_SEEN +a7f3... alpha.example.com:9001 true 2026-05-12T15:01:32Z +b21c... bravo.example.com:9001 true 2026-05-12T15:01:32Z +c0d4... charlie.example.com:9001 true 2026-05-12T15:01:32Z +``` + +## Adding checks and alerts + +```sh +# alerts first so checks can reference them +qu alert add discord oncall --webhook https://discord.com/api/webhooks/... +qu alert add smtp ops --host smtp.example.com --port 587 \ + --from monitor@example.com --to ops@example.com \ + --user mailbot --password '****' --starttls=true + +# checks +qu check add http homepage https://example.com --expect 200 --alerts oncall,ops +qu check add tcp db db.internal:5432 --interval 15s +qu check add icmp gateway 10.0.0.1 --interval 5s +``` + +Mutations always route to the master, which bumps a monotonic version +and pushes the new `cluster.yaml` to every peer. If quorum is lost, +mutating commands fail loudly. + +## Test an alert without waiting for a real outage + +```sh +qu alert test oncall +``` + +## File layout + +A node's state lives under `$QUPTIME_DIR` (defaults to `/etc/quptime` +when root, `~/.config/quptime` otherwise): + +``` +node.yaml identity (NodeID, bind addr, port). Never replicated. +cluster.yaml replicated state: peers, checks, alerts, version. +trust.yaml local fingerprint trust store. +keys/ RSA private + public + self-signed cert. +``` + +The CLI talks to the local daemon over a unix socket at +`$QUPTIME_SOCKET` (defaults to `/var/run/quptime/quptime.sock` when +root, `$XDG_RUNTIME_DIR/quptime/quptime.sock` otherwise) — filesystem +permissions guard it; no TLS on the local socket. + +## ICMP and capabilities + +ICMP checks default to unprivileged UDP-mode pings so the daemon does +not need root or `CAP_NET_RAW`. If you want classic raw ICMP, either +run the daemon as root or grant the capability: + +```sh +sudo setcap cap_net_raw=+ep ./qu +``` + +## CLI reference + +``` +qu init generate identity + keys +qu serve run the daemon +qu status quorum, master, check states +qu node add TOFU-add a peer +qu node list show peers + liveness +qu node remove remove from cluster + trust +qu check add http [--expect 200] [--interval 30s] [--body-match str] [--alerts a,b] +qu check add tcp +qu check add icmp +qu check list +qu check remove +qu alert add smtp --host … --port … --from … --to … [--user --password --starttls] +qu alert add discord --webhook … +qu alert list / remove / test +qu trust list / remove +``` + +All `--interval` and `--timeout` flags accept Go duration syntax: `5s`, +`1m30s`, `2h`, etc. + +## Tests + +```sh +go test ./... +go test -race ./... +``` + +Each internal package has unit tests; coverage hovers around 60–90 % +on the meaningful packages. The transport tests bring up real mTLS +listeners over loopback, which exercises the cert pinning end-to-end. + +## What's intentionally not here (v1) + +- No web UI. The CLI is the only operator surface. +- No historical metrics or SLA reports — only the current aggregate + state is kept in memory. Add SQLite later if you need graphs. +- No automatic key rotation. Re-init a node and re-trust if you need + to roll its identity. +- No multi-tenant isolation. One cluster = one set of checks. + +## Layout + +``` +cmd/qu/ entry point +internal/config/ on-disk file layout, ClusterConfig, NodeConfig +internal/crypto/ RSA keypair + self-signed cert + SPKI fingerprints +internal/trust/ fingerprint trust store +internal/transport/ mTLS listener/dialer, framed JSON-RPC +internal/quorum/ heartbeats + deterministic master election +internal/replicate/ master-routed mutations, version-gated replication +internal/checks/ HTTP/TCP/ICMP probers, scheduler, aggregator +internal/alerts/ SMTP + Discord dispatchers, message rendering +internal/daemon/ glue: wires every component + control socket +internal/cli/ cobra commands, the user-facing surface +``` diff --git a/internal/alerts/dispatcher.go b/internal/alerts/dispatcher.go index 462de8a..1e288d2 100644 --- a/internal/alerts/dispatcher.go +++ b/internal/alerts/dispatcher.go @@ -32,7 +32,7 @@ func (d *Dispatcher) OnTransition(check *config.Check, from, to checks.State, sn } msg := Render(d.selfID, check, from, to, snap) for _, alertID := range check.AlertIDs { - alert, _ := d.cluster.FindAlert(alertID) + alert := d.cluster.FindAlert(alertID) if alert == nil { d.logger.Printf("alerts: check %q references unknown alert %q", check.Name, alertID) continue @@ -46,7 +46,7 @@ func (d *Dispatcher) OnTransition(check *config.Check, from, to checks.State, sn // Test sends a one-shot test message to the named alert. Returns an // error so the CLI can surface failures interactively. func (d *Dispatcher) Test(alertID string) error { - alert, _ := d.cluster.FindAlert(alertID) + alert := d.cluster.FindAlert(alertID) if alert == nil { return fmt.Errorf("alert %q not found", alertID) } diff --git a/internal/alerts/message_test.go b/internal/alerts/message_test.go new file mode 100644 index 0000000..4c58d01 --- /dev/null +++ b/internal/alerts/message_test.go @@ -0,0 +1,52 @@ +package alerts + +import ( + "strings" + "testing" + + "github.com/jasper/quptime/internal/checks" + "github.com/jasper/quptime/internal/config" +) + +func TestRenderDownTransition(t *testing.T) { + check := &config.Check{Name: "homepage", Target: "https://example.com", Type: config.CheckHTTP} + snap := checks.Snapshot{Reports: 3, OKCount: 0, NotOK: 3, Detail: "connection refused"} + msg := Render("master-node", check, checks.StateUp, checks.StateDown, snap) + + if !strings.Contains(msg.Subject, "DOWN") { + t.Errorf("subject missing DOWN: %q", msg.Subject) + } + if !strings.Contains(msg.Subject, "homepage") { + t.Errorf("subject missing check name: %q", msg.Subject) + } + if !strings.Contains(msg.Body, "connection refused") { + t.Errorf("body missing detail: %q", msg.Body) + } + if !strings.Contains(msg.Body, "master-node") { + t.Errorf("body missing reporter: %q", msg.Body) + } + if !strings.Contains(msg.Body, "3 (ok=0, fail=3)") { + t.Errorf("body missing report count: %q", msg.Body) + } +} + +func TestRenderRecoveryTransition(t *testing.T) { + check := &config.Check{Name: "api", Target: "https://api/", Type: config.CheckHTTP} + snap := checks.Snapshot{Reports: 3, OKCount: 3, NotOK: 0} + msg := Render("master", check, checks.StateDown, checks.StateUp, snap) + if !strings.Contains(msg.Subject, "RECOVERED") { + t.Errorf("subject missing RECOVERED: %q", msg.Subject) + } +} + +func TestRenderUpInitialTransition(t *testing.T) { + check := &config.Check{Name: "api", Target: "https://api/"} + snap := checks.Snapshot{Reports: 1, OKCount: 1} + msg := Render("master", check, checks.StateUnknown, checks.StateUp, snap) + if !strings.Contains(msg.Subject, "UP") { + t.Errorf("subject missing UP: %q", msg.Subject) + } + if strings.Contains(msg.Subject, "RECOVERED") { + t.Error("first-time UP should not be tagged RECOVERED") + } +} diff --git a/internal/checks/aggregator.go b/internal/checks/aggregator.go index 660ea5a..0b87860 100644 --- a/internal/checks/aggregator.go +++ b/internal/checks/aggregator.go @@ -99,17 +99,6 @@ func (a *Aggregator) Submit(nodeID string, r Result) { a.evaluate(r.CheckID) } -// SnapshotAll returns the current aggregate view of every known check. -func (a *Aggregator) SnapshotAll() map[string]Snapshot { - a.mu.Lock() - defer a.mu.Unlock() - out := make(map[string]Snapshot, len(a.perCheck)) - for id, st := range a.perCheck { - out[id] = a.snapshotLocked(id, st) - } - return out -} - // SnapshotFor returns the aggregate for a single check. func (a *Aggregator) SnapshotFor(checkID string) (Snapshot, bool) { a.mu.Lock() diff --git a/internal/checks/aggregator_test.go b/internal/checks/aggregator_test.go new file mode 100644 index 0000000..5af56bc --- /dev/null +++ b/internal/checks/aggregator_test.go @@ -0,0 +1,111 @@ +package checks + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/jasper/quptime/internal/config" +) + +func TestAggregatorHysteresisRequiresConsecutiveEvals(t *testing.T) { + cluster := &config.ClusterConfig{Checks: []config.Check{ + {ID: "c1", Name: "x", Interval: 10 * time.Second}, + }} + + var transitions atomic.Int32 + agg := NewAggregator(cluster, func(_ *config.Check, _, _ State, _ Snapshot) { + transitions.Add(1) + }) + + // First OK submission — candidate=Up, committed still Unknown. + agg.Submit("nodeA", Result{CheckID: "c1", OK: true, Timestamp: time.Now()}) + snap, _ := agg.SnapshotFor("c1") + if snap.State != StateUnknown { + t.Errorf("after one tick state=%s want unknown", snap.State) + } + if transitions.Load() != 0 { + t.Errorf("transitions=%d after one tick, want 0", transitions.Load()) + } + + // Second OK — hysteresis satisfied, commit Up. + agg.Submit("nodeA", Result{CheckID: "c1", OK: true, Timestamp: time.Now()}) + snap, _ = agg.SnapshotFor("c1") + if snap.State != StateUp { + t.Errorf("after two ticks state=%s want up", snap.State) + } + if transitions.Load() != 1 { + t.Errorf("transitions=%d after commit, want 1", transitions.Load()) + } + + // Single failure — candidate flips to Down, committed stays Up. + agg.Submit("nodeA", Result{CheckID: "c1", OK: false, Detail: "boom", Timestamp: time.Now()}) + snap, _ = agg.SnapshotFor("c1") + if snap.State != StateUp { + t.Errorf("single fail flipped state prematurely: %s", snap.State) + } + + // Second failure — commit Down. + agg.Submit("nodeA", Result{CheckID: "c1", OK: false, Detail: "boom", Timestamp: time.Now()}) + snap, _ = agg.SnapshotFor("c1") + if snap.State != StateDown { + t.Errorf("after two fails state=%s want down", snap.State) + } + if transitions.Load() != 2 { + t.Errorf("transitions=%d after second commit, want 2", transitions.Load()) + } +} + +func TestAggregatorMajorityRule(t *testing.T) { + cluster := &config.ClusterConfig{Checks: []config.Check{ + {ID: "c1", Name: "x", Interval: 10 * time.Second}, + }} + agg := NewAggregator(cluster, nil) + + // 2 OK + 1 fail → candidate Up. + now := time.Now() + agg.Submit("a", Result{CheckID: "c1", OK: true, Timestamp: now}) + agg.Submit("b", Result{CheckID: "c1", OK: true, Timestamp: now}) + agg.Submit("c", Result{CheckID: "c1", OK: false, Timestamp: now}) + + snap, _ := agg.SnapshotFor("c1") + if snap.OKCount != 2 || snap.NotOK != 1 { + t.Errorf("counts wrong: %+v", snap) + } + + // flip the majority + for i := 0; i < 2; i++ { + agg.Submit("a", Result{CheckID: "c1", OK: false, Timestamp: time.Now()}) + agg.Submit("b", Result{CheckID: "c1", OK: false, Timestamp: time.Now()}) + agg.Submit("c", Result{CheckID: "c1", OK: false, Timestamp: time.Now()}) + } + snap, _ = agg.SnapshotFor("c1") + if snap.State != StateDown { + t.Errorf("majority-fail did not transition to down: %s", snap.State) + } +} + +func TestAggregatorDropsUnknownChecks(t *testing.T) { + cluster := &config.ClusterConfig{} + agg := NewAggregator(cluster, nil) + + agg.Submit("a", Result{CheckID: "ghost", OK: true, Timestamp: time.Now()}) + if _, ok := agg.SnapshotFor("ghost"); ok { + t.Error("aggregator kept state for unconfigured check") + } +} + +func TestAggregatorIgnoresStaleResults(t *testing.T) { + cluster := &config.ClusterConfig{Checks: []config.Check{ + {ID: "c1", Name: "x", Interval: 10 * time.Second}, + }} + agg := NewAggregator(cluster, nil) + + old := time.Now().Add(-10 * time.Minute) + agg.Submit("a", Result{CheckID: "c1", OK: true, Timestamp: old}) + + snap, _ := agg.SnapshotFor("c1") + if snap.Reports != 0 { + t.Errorf("stale report counted: %+v", snap) + } +} diff --git a/internal/checks/probe_test.go b/internal/checks/probe_test.go new file mode 100644 index 0000000..cdfbf52 --- /dev/null +++ b/internal/checks/probe_test.go @@ -0,0 +1,118 @@ +package checks + +import ( + "context" + "net" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/jasper/quptime/internal/config" +) + +func TestHTTPProberHappyPath(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(200) + w.Write([]byte("hello world")) + })) + defer srv.Close() + + res := Run(context.Background(), &config.Check{ + ID: "c", Type: config.CheckHTTP, Target: srv.URL, + Timeout: 5 * time.Second, ExpectStatus: 200, + }) + if !res.OK { + t.Errorf("expected OK, got %+v", res) + } +} + +func TestHTTPProberBodyMatch(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(200) + w.Write([]byte("the magic word is xyzzy and other stuff")) + })) + defer srv.Close() + + hit := Run(context.Background(), &config.Check{ + ID: "c", Type: config.CheckHTTP, Target: srv.URL, + Timeout: 5 * time.Second, BodyMatch: "xyzzy", + }) + if !hit.OK { + t.Errorf("expected match, got %+v", hit) + } + + miss := Run(context.Background(), &config.Check{ + ID: "c", Type: config.CheckHTTP, Target: srv.URL, + Timeout: 5 * time.Second, BodyMatch: "absent", + }) + if miss.OK { + t.Errorf("expected miss, got %+v", miss) + } + if !strings.Contains(miss.Detail, "body match") { + t.Errorf("detail unexpected: %q", miss.Detail) + } +} + +func TestHTTPProberStatusMismatch(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(500) + })) + defer srv.Close() + + res := Run(context.Background(), &config.Check{ + ID: "c", Type: config.CheckHTTP, Target: srv.URL, Timeout: 5 * time.Second, + }) + if res.OK { + t.Errorf("500 should fail check, got %+v", res) + } +} + +func TestTCPProberHappyPath(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + go func() { + for { + c, err := ln.Accept() + if err != nil { + return + } + c.Close() + } + }() + + res := Run(context.Background(), &config.Check{ + ID: "c", Type: config.CheckTCP, Target: ln.Addr().String(), + Timeout: 2 * time.Second, + }) + if !res.OK { + t.Errorf("expected OK, got %+v", res) + } +} + +func TestTCPProberRefusedConnection(t *testing.T) { + // Listen and immediately close so the address is known-bad. + ln, _ := net.Listen("tcp", "127.0.0.1:0") + addr := ln.Addr().String() + ln.Close() + + res := Run(context.Background(), &config.Check{ + ID: "c", Type: config.CheckTCP, Target: addr, Timeout: 1 * time.Second, + }) + if res.OK { + t.Errorf("dead address should fail check, got %+v", res) + } +} + +func TestRunUnknownCheckType(t *testing.T) { + res := Run(context.Background(), &config.Check{ + ID: "c", Type: "bogus", Target: "x", + }) + if res.OK { + t.Error("unknown check type should not succeed") + } +} diff --git a/internal/cli/alert.go b/internal/cli/alert.go index fc5d5ae..e9fe2db 100644 --- a/internal/cli/alert.go +++ b/internal/cli/alert.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "text/tabwriter" "time" "github.com/google/uuid" @@ -30,30 +31,16 @@ func addAlertCmd(root *cobra.Command) { Use: "list", Short: "List configured alerts", RunE: func(cmd *cobra.Command, args []string) error { - ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Second) - defer cancel() - raw, err := callDaemon(ctx, daemon.CtrlStatus, nil) - if err != nil { - return err - } - var st transport.StatusResponse - if err := json.Unmarshal(raw, &st); err != nil { - return err - } - // status response doesn't carry alerts — call mutate with a - // "list" by reading cluster.yaml indirectly via status's - // version is not enough. Fall back: ask for ClusterConfig - // via a dedicated read RPC if needed. For v1 we rely on - // node.yaml being co-located: read the local cluster.yaml - // directly so the operator gets up-to-date output. cluster, err := config.LoadClusterConfig() if err != nil { return err } + tw := tabwriter.NewWriter(cmd.OutOrStdout(), 0, 0, 2, ' ', 0) + fmt.Fprintln(tw, "ID\tTYPE\tNAME") for _, a := range cluster.Alerts { - fmt.Fprintf(cmd.OutOrStdout(), "%s\t%s\t%s\n", a.ID, a.Type, a.Name) + fmt.Fprintf(tw, "%s\t%s\t%s\n", a.ID, a.Type, a.Name) } - return nil + return tw.Flush() }, } diff --git a/internal/cli/check.go b/internal/cli/check.go index 3e7bdde..98702a4 100644 --- a/internal/cli/check.go +++ b/internal/cli/check.go @@ -30,7 +30,6 @@ func addCheckCmd(root *cobra.Command) { }) addHTTP.Flags().Int("expect", 200, "HTTP status code that signals UP") addHTTP.Flags().String("body-match", "", "substring required in response body for UP") - bindHTTPFlags(addHTTP) addTCP := buildAddCheckCmd(config.CheckTCP, "tcp", " ", "Add a TCP-connect check", @@ -168,7 +167,3 @@ func bindCheckFlags(cmd *cobra.Command) { cmd.Flags().String("timeout", "10s", "per-probe timeout") cmd.Flags().String("alerts", "", "comma-separated alert IDs/names to notify on transition") } - -// bindHTTPFlags is a no-op kept to mirror the per-type flag bind sites -// so the caller can extend cleanly later. -func bindHTTPFlags(cmd *cobra.Command) {} diff --git a/internal/config/cluster.go b/internal/config/cluster.go index 365b713..cff6648 100644 --- a/internal/config/cluster.go +++ b/internal/config/cluster.go @@ -175,43 +175,18 @@ func (c *ClusterConfig) Replace(incoming *ClusterConfig) (bool, error) { return true, nil } -// FindCheck returns the check with the given ID or name. -func (c *ClusterConfig) FindCheck(idOrName string) (*Check, int) { - c.mu.RLock() - defer c.mu.RUnlock() - for i := range c.Checks { - if c.Checks[i].ID == idOrName || c.Checks[i].Name == idOrName { - cp := c.Checks[i] - return &cp, i - } - } - return nil, -1 -} - -// FindAlert returns the alert with the given ID or name. -func (c *ClusterConfig) FindAlert(idOrName string) (*Alert, int) { +// FindAlert returns the alert with the given ID or name, or nil if +// no entry matches. +func (c *ClusterConfig) FindAlert(idOrName string) *Alert { c.mu.RLock() defer c.mu.RUnlock() for i := range c.Alerts { if c.Alerts[i].ID == idOrName || c.Alerts[i].Name == idOrName { cp := c.Alerts[i] - return &cp, i + return &cp } } - return nil, -1 -} - -// FindPeer returns the peer with the given node ID. -func (c *ClusterConfig) FindPeer(nodeID string) (*PeerInfo, int) { - c.mu.RLock() - defer c.mu.RUnlock() - for i := range c.Peers { - if c.Peers[i].NodeID == nodeID { - cp := c.Peers[i] - return &cp, i - } - } - return nil, -1 + return nil } // QuorumSize returns the minimum number of live nodes required for diff --git a/internal/config/cluster_test.go b/internal/config/cluster_test.go new file mode 100644 index 0000000..c13993c --- /dev/null +++ b/internal/config/cluster_test.go @@ -0,0 +1,107 @@ +package config + +import ( + "fmt" + "testing" +) + +func TestQuorumSize(t *testing.T) { + cases := []struct { + peers int + want int + }{ + {0, 1}, + {1, 1}, + {2, 2}, + {3, 2}, + {4, 3}, + {5, 3}, + {7, 4}, + } + for _, tc := range cases { + c := &ClusterConfig{} + for i := 0; i < tc.peers; i++ { + c.Peers = append(c.Peers, PeerInfo{NodeID: fmt.Sprintf("n%d", i)}) + } + if got := c.QuorumSize(); got != tc.want { + t.Errorf("peers=%d: QuorumSize=%d want %d", tc.peers, got, tc.want) + } + } +} + +func TestClusterMutateBumpsVersion(t *testing.T) { + t.Setenv("QUPTIME_DIR", t.TempDir()) + c := &ClusterConfig{} + + err := c.Mutate("nodeA", func(cc *ClusterConfig) error { + cc.Checks = append(cc.Checks, Check{ID: "1", Name: "x"}) + return nil + }) + if err != nil { + t.Fatal(err) + } + if c.Version != 1 { + t.Errorf("Version=%d want 1", c.Version) + } + if c.UpdatedBy != "nodeA" { + t.Errorf("UpdatedBy=%q want nodeA", c.UpdatedBy) + } + + err = c.Mutate("nodeB", func(cc *ClusterConfig) error { return nil }) + if err != nil { + t.Fatal(err) + } + if c.Version != 2 { + t.Errorf("Version=%d want 2 after second mutate", c.Version) + } +} + +func TestClusterReplaceGatesOnVersion(t *testing.T) { + t.Setenv("QUPTIME_DIR", t.TempDir()) + cur := &ClusterConfig{Version: 5, Checks: []Check{{ID: "old"}}} + + if applied, _ := cur.Replace(&ClusterConfig{Version: 4}); applied { + t.Error("older version was applied") + } + if applied, _ := cur.Replace(&ClusterConfig{Version: 5}); applied { + t.Error("equal version was applied") + } + applied, err := cur.Replace(&ClusterConfig{ + Version: 6, + Checks: []Check{{ID: "new"}}, + }) + if err != nil { + t.Fatal(err) + } + if !applied { + t.Error("newer version was not applied") + } + if cur.Version != 6 || len(cur.Checks) != 1 || cur.Checks[0].ID != "new" { + t.Errorf("after replace: %+v", cur) + } +} + +func TestClusterSnapshotIsCopy(t *testing.T) { + c := &ClusterConfig{Checks: []Check{{ID: "a"}}} + snap := c.Snapshot() + snap.Checks[0].ID = "b" + if c.Checks[0].ID != "a" { + t.Error("snapshot mutation leaked back to original") + } +} + +func TestFindAlert(t *testing.T) { + c := &ClusterConfig{Alerts: []Alert{ + {ID: "id-1", Name: "primary", Type: AlertSMTP}, + {ID: "id-2", Name: "secondary", Type: AlertDiscord}, + }} + if a := c.FindAlert("primary"); a == nil || a.Type != AlertSMTP { + t.Errorf("by name: %+v", a) + } + if a := c.FindAlert("id-2"); a == nil || a.Type != AlertDiscord { + t.Errorf("by id: %+v", a) + } + if a := c.FindAlert("ghost"); a != nil { + t.Errorf("expected nil for missing, got %+v", a) + } +} diff --git a/internal/config/node_test.go b/internal/config/node_test.go new file mode 100644 index 0000000..4118ebf --- /dev/null +++ b/internal/config/node_test.go @@ -0,0 +1,58 @@ +package config + +import "testing" + +func TestAdvertiseAddrFallback(t *testing.T) { + cases := []struct { + name string + cfg NodeConfig + want string + }{ + {"explicit advertise wins", NodeConfig{Advertise: "host:1234", BindAddr: "0.0.0.0", BindPort: 9001}, "host:1234"}, + {"empty bind falls back to loopback", NodeConfig{BindPort: 9001}, "127.0.0.1:9001"}, + {"wildcard bind falls back to loopback", NodeConfig{BindAddr: "0.0.0.0", BindPort: 9001}, "127.0.0.1:9001"}, + {"ipv6 wildcard falls back to loopback", NodeConfig{BindAddr: "::", BindPort: 9001}, "127.0.0.1:9001"}, + {"specific bind preserved", NodeConfig{BindAddr: "10.0.0.1", BindPort: 9001}, "10.0.0.1:9001"}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := tc.cfg.AdvertiseAddr(); got != tc.want { + t.Errorf("got %q want %q", got, tc.want) + } + }) + } +} + +func TestNodeConfigRoundtrip(t *testing.T) { + t.Setenv("QUPTIME_DIR", t.TempDir()) + n := &NodeConfig{NodeID: "abc", BindAddr: "127.0.0.1", BindPort: 9001, Advertise: "10.0.0.1:9001"} + if err := n.Save(); err != nil { + t.Fatal(err) + } + loaded, err := LoadNodeConfig() + if err != nil { + t.Fatal(err) + } + if *loaded != *n { + t.Errorf("got %+v want %+v", *loaded, *n) + } +} + +func TestLoadNodeConfigAppliesDefaults(t *testing.T) { + t.Setenv("QUPTIME_DIR", t.TempDir()) + // Save with empty bind addr/port to verify Load fills them. + n := &NodeConfig{NodeID: "abc"} + if err := n.Save(); err != nil { + t.Fatal(err) + } + loaded, err := LoadNodeConfig() + if err != nil { + t.Fatal(err) + } + if loaded.BindPort != 9001 { + t.Errorf("BindPort=%d want 9001", loaded.BindPort) + } + if loaded.BindAddr != "0.0.0.0" { + t.Errorf("BindAddr=%q want 0.0.0.0", loaded.BindAddr) + } +} diff --git a/internal/config/paths.go b/internal/config/paths.go index 4335818..a1f71ba 100644 --- a/internal/config/paths.go +++ b/internal/config/paths.go @@ -6,7 +6,6 @@ // cluster.yaml — replicated state (peers, checks, alerts, version) // trust.yaml — local fingerprint trust store // keys/ — RSA private + public keys + self-signed cert -// state.json — runtime cache (last check results, current master) // // A unix socket for the local CLI lives alongside (defaults to // /var/run/quptime/quptime.sock when running as root, otherwise @@ -25,7 +24,6 @@ const ( NodeFile = "node.yaml" ClusterFile = "cluster.yaml" TrustFile = "trust.yaml" - StateFile = "state.json" KeysDir = "keys" PrivateKey = "private.pem" PublicKey = "public.pem" @@ -86,9 +84,6 @@ func ClusterFilePath() string { return filepath.Join(DataDir(), ClusterFile) } // TrustFilePath returns the absolute path to trust.yaml. func TrustFilePath() string { return filepath.Join(DataDir(), TrustFile) } -// StateFilePath returns the absolute path to state.json. -func StateFilePath() string { return filepath.Join(DataDir(), StateFile) } - // PrivateKeyPath returns the absolute path to the RSA private key. func PrivateKeyPath() string { return filepath.Join(DataDir(), KeysDir, PrivateKey) } diff --git a/internal/crypto/cert.go b/internal/crypto/cert.go index c84f8dd..825bdd5 100644 --- a/internal/crypto/cert.go +++ b/internal/crypto/cert.go @@ -65,13 +65,3 @@ func FingerprintFromCertPEM(certPEM []byte) (string, error) { } return Fingerprint(cert), nil } - -// FingerprintFromPubKeyPEM parses a public-key PEM and returns its -// fingerprint over the same SPKI bytes. -func FingerprintFromPubKeyPEM(pubPEM []byte) (string, error) { - block, _ := pem.Decode(pubPEM) - if block == nil { - return "", errors.New("pubkey: no PEM block") - } - return FingerprintFromSPKI(block.Bytes), nil -} diff --git a/internal/crypto/crypto_test.go b/internal/crypto/crypto_test.go new file mode 100644 index 0000000..a46d3ef --- /dev/null +++ b/internal/crypto/crypto_test.go @@ -0,0 +1,111 @@ +package crypto + +import ( + "crypto/x509" + "encoding/pem" + "strings" + "testing" +) + +func TestGenerateAndLoadKeyPair(t *testing.T) { + t.Setenv("QUPTIME_DIR", t.TempDir()) + + priv, err := GenerateKeyPair("node-1") + if err != nil { + t.Fatalf("GenerateKeyPair: %v", err) + } + if priv.N.BitLen() < KeySize-8 { + t.Errorf("key too small: %d bits", priv.N.BitLen()) + } + + // Refusing to overwrite existing material is part of the contract. + if _, err := GenerateKeyPair("node-1"); err == nil { + t.Error("expected error on re-generate") + } + + loaded, err := LoadPrivateKey() + if err != nil { + t.Fatalf("LoadPrivateKey: %v", err) + } + if loaded.N.Cmp(priv.N) != 0 { + t.Error("loaded key modulus differs from generated") + } +} + +func TestFingerprintDeterminismAndUniqueness(t *testing.T) { + t.Setenv("QUPTIME_DIR", t.TempDir()) + priv, err := GenerateKeyPair("node-x") + if err != nil { + t.Fatal(err) + } + + certPEM, err := LoadCertPEM() + if err != nil { + t.Fatal(err) + } + block, _ := pem.Decode(certPEM) + if block == nil { + t.Fatal("no PEM block in cert") + } + cert, err := x509.ParseCertificate(block.Bytes) + if err != nil { + t.Fatal(err) + } + + fp1 := Fingerprint(cert) + fp2 := Fingerprint(cert) + if fp1 != fp2 { + t.Errorf("non-deterministic: %s vs %s", fp1, fp2) + } + if !strings.HasPrefix(fp1, "sha256:") { + t.Errorf("missing sha256: prefix: %s", fp1) + } + + pemFP, err := FingerprintFromCertPEM(certPEM) + if err != nil { + t.Fatal(err) + } + if pemFP != fp1 { + t.Errorf("PEM-derived fingerprint differs: %s vs %s", pemFP, fp1) + } + + // Now generate a fresh cert from the same key — fingerprint must + // match (SPKI is identical). + derSame, err := buildSelfSignedCert(priv, "node-x") + if err != nil { + t.Fatal(err) + } + certSame, _ := x509.ParseCertificate(derSame) + if Fingerprint(certSame) != fp1 { + t.Error("fingerprint changed across cert regen with same key") + } +} + +func TestFingerprintDiffersAcrossKeys(t *testing.T) { + dirA := t.TempDir() + dirB := t.TempDir() + + t.Setenv("QUPTIME_DIR", dirA) + if _, err := GenerateKeyPair("a"); err != nil { + t.Fatal(err) + } + pemA, _ := LoadCertPEM() + fpA, _ := FingerprintFromCertPEM(pemA) + + t.Setenv("QUPTIME_DIR", dirB) + if _, err := GenerateKeyPair("b"); err != nil { + t.Fatal(err) + } + pemB, _ := LoadCertPEM() + fpB, _ := FingerprintFromCertPEM(pemB) + + if fpA == fpB { + t.Error("two independent keys produced the same fingerprint") + } +} + +func TestFingerprintFromCertPEMRejectsGarbage(t *testing.T) { + if _, err := FingerprintFromCertPEM([]byte("not a pem")); err == nil { + t.Error("expected error on non-PEM input") + } +} diff --git a/internal/crypto/keys.go b/internal/crypto/keys.go index c5f569e..2c0ccd2 100644 --- a/internal/crypto/keys.go +++ b/internal/crypto/keys.go @@ -88,12 +88,6 @@ func LoadCertPEM() ([]byte, error) { return os.ReadFile(config.CertFilePath()) } -// LoadPublicKeyPEM reads the public-key PEM (exchanged out of band -// during invite / join). -func LoadPublicKeyPEM() ([]byte, error) { - return os.ReadFile(config.PublicKeyPath()) -} - func writePEM(path, blockType string, der []byte, perm os.FileMode) error { encoded := pem.EncodeToMemory(&pem.Block{Type: blockType, Bytes: der}) return config.AtomicWrite(path, encoded, perm) diff --git a/internal/daemon/control.go b/internal/daemon/control.go index 731626e..68f06bd 100644 --- a/internal/daemon/control.go +++ b/internal/daemon/control.go @@ -196,8 +196,7 @@ func (c *controlServer) dispatch(ctx context.Context, req CtrlRequest) CtrlRespo if err := json.Unmarshal(req.Body, &body); err != nil { return fail(err) } - var payload json.RawMessage = body.Payload - ver, err := c.d.replicator.LocalMutate(ctx, body.Kind, json.RawMessage(payload)) + ver, err := c.d.replicator.LocalMutate(ctx, body.Kind, body.Payload) if err != nil { return fail(err) } diff --git a/internal/quorum/manager_test.go b/internal/quorum/manager_test.go new file mode 100644 index 0000000..42ef6d6 --- /dev/null +++ b/internal/quorum/manager_test.go @@ -0,0 +1,146 @@ +package quorum + +import ( + "testing" + "time" + + "github.com/jasper/quptime/internal/config" + "github.com/jasper/quptime/internal/transport" +) + +func threeNode(self string) (*config.ClusterConfig, *Manager) { + cluster := &config.ClusterConfig{Peers: []config.PeerInfo{ + {NodeID: "a"}, {NodeID: "b"}, {NodeID: "c"}, + }} + return cluster, New(self, cluster, nil) +} + +func TestSoloNodeElectsItself(t *testing.T) { + cluster := &config.ClusterConfig{} + m := New("only", cluster, nil) + m.markLive("only") + m.recomputeMaster() + if m.Master() != "only" { + t.Errorf("Master=%q want %q", m.Master(), "only") + } + if !m.HasQuorum() { + t.Error("solo node should have quorum") + } + if m.Term() != 1 { + t.Errorf("Term=%d want 1 after first election", m.Term()) + } +} + +func TestThreeNodeElectsLowestNodeID(t *testing.T) { + _, m := threeNode("b") + m.markLive("a") + m.markLive("b") + m.markLive("c") + m.recomputeMaster() + if got := m.Master(); got != "a" { + t.Errorf("Master=%q want a", got) + } + if !m.HasQuorum() { + t.Error("expected quorum with 3 live of 3") + } +} + +func TestNoQuorumClearsMaster(t *testing.T) { + _, m := threeNode("b") + m.markLive("b") + m.recomputeMaster() + if m.Master() != "" { + t.Errorf("Master=%q want empty (no quorum)", m.Master()) + } + if m.HasQuorum() { + t.Error("1 of 3 live should not be quorum") + } +} + +func TestTermBumpsOnMasterChange(t *testing.T) { + _, m := threeNode("b") + m.markLive("a") + m.markLive("b") + m.recomputeMaster() + termBefore := m.Term() + masterBefore := m.Master() + if masterBefore != "a" { + t.Fatalf("expected initial master a, got %q", masterBefore) + } + + // "a" goes dead — we and "c" join up. + m.mu.Lock() + delete(m.lastSeen, "a") + m.mu.Unlock() + m.markLive("c") + m.recomputeMaster() + if m.Master() != "b" { + t.Errorf("after a-fail Master=%q want b", m.Master()) + } + if m.Term() <= termBefore { + t.Errorf("Term did not bump: before=%d after=%d", termBefore, m.Term()) + } +} + +func TestHandleHeartbeatMarksSenderLive(t *testing.T) { + cluster, m := threeNode("a") + _ = cluster + resp := m.HandleHeartbeat(transport.HeartbeatRequest{ + FromNodeID: "b", + Term: 7, + MasterID: "a", + Version: 3, + }) + if resp.NodeID != "a" { + t.Errorf("response NodeID=%q want a", resp.NodeID) + } + if _, ok := m.Liveness()["b"]; !ok { + t.Error("sender was not recorded live") + } +} + +func TestDeadAfterEvictsStaleLiveness(t *testing.T) { + _, m := threeNode("a") + m.deadAfter = 50 * time.Millisecond + m.markLive("a") + m.markLive("b") + m.markLive("c") + m.recomputeMaster() + if m.Master() != "a" { + t.Fatal("expected initial master a") + } + + // Wait past the dead-after window — only self remains live. + time.Sleep(120 * time.Millisecond) + m.markLive("a") + m.recomputeMaster() + if m.Master() != "" { + t.Errorf("expected no master after peers timed out, got %q", m.Master()) + } +} + +func TestVersionObserverFiresOnHigherVersion(t *testing.T) { + cluster := &config.ClusterConfig{Version: 2} + m := New("a", cluster, nil) + + var notified struct { + peerID string + peerVer uint64 + count int + } + m.SetVersionObserver(func(peerID, _ string, peerVer uint64) { + notified.peerID = peerID + notified.peerVer = peerVer + notified.count++ + }) + + m.maybeNotifyVersion("b", 5) + if notified.count != 1 || notified.peerID != "b" || notified.peerVer != 5 { + t.Errorf("expected observer fired with b=5, got %+v", notified) + } + + m.maybeNotifyVersion("b", 1) + if notified.count != 1 { + t.Errorf("observer fired for stale version, count=%d", notified.count) + } +} diff --git a/internal/replicate/replicator.go b/internal/replicate/replicator.go index 83a86f2..9d771fa 100644 --- a/internal/replicate/replicator.go +++ b/internal/replicate/replicator.go @@ -36,16 +36,23 @@ type MasterView interface { HasQuorum() bool } +// RPCClient is the slice of *transport.Client that the replicator +// actually uses. Pulled out as an interface so tests can stub it +// without bringing up a TLS listener. +type RPCClient interface { + Call(ctx context.Context, nodeID, addr, method string, params, out any) error +} + // Replicator drives mutation routing and broadcast. type Replicator struct { selfID string cluster *config.ClusterConfig - client *transport.Client + client RPCClient master MasterView } // New constructs a replicator. selfID is this node's NodeID. -func New(selfID string, cluster *config.ClusterConfig, client *transport.Client, master MasterView) *Replicator { +func New(selfID string, cluster *config.ClusterConfig, client RPCClient, master MasterView) *Replicator { return &Replicator{ selfID: selfID, cluster: cluster, diff --git a/internal/replicate/replicator_test.go b/internal/replicate/replicator_test.go new file mode 100644 index 0000000..2c95c4a --- /dev/null +++ b/internal/replicate/replicator_test.go @@ -0,0 +1,164 @@ +package replicate + +import ( + "context" + "encoding/json" + "sync" + "testing" + + "github.com/jasper/quptime/internal/config" + "github.com/jasper/quptime/internal/transport" +) + +type fakeMaster struct { + master string + isMaster bool + hasQuorum bool +} + +func (f *fakeMaster) Master() string { return f.master } +func (f *fakeMaster) IsMaster() bool { return f.isMaster } +func (f *fakeMaster) HasQuorum() bool { return f.hasQuorum } + +// stubClient records every Call without doing any actual I/O. +type stubClient struct { + mu sync.Mutex + calls []string +} + +func (s *stubClient) Call(_ context.Context, _, _, method string, _, _ any) error { + s.mu.Lock() + defer s.mu.Unlock() + s.calls = append(s.calls, method) + return nil +} + +func newReplicator(t *testing.T, isMaster, hasQuorum bool) (*Replicator, *config.ClusterConfig, *stubClient) { + t.Helper() + t.Setenv("QUPTIME_DIR", t.TempDir()) + cluster := &config.ClusterConfig{} + fm := &fakeMaster{master: "self", isMaster: isMaster, hasQuorum: hasQuorum} + stub := &stubClient{} + r := New("self", cluster, stub, fm) + return r, cluster, stub +} + +func TestApplyAddCheck(t *testing.T) { + r, cluster, _ := newReplicator(t, true, true) + payload, _ := json.Marshal(config.Check{ID: "c1", Name: "homepage", Type: config.CheckHTTP, Target: "https://example.com"}) + + ver, err := r.LocalMutate(context.Background(), transport.MutationAddCheck, json.RawMessage(payload)) + if err != nil { + t.Fatal(err) + } + if ver != 1 { + t.Errorf("version=%d want 1", ver) + } + if len(cluster.Snapshot().Checks) != 1 { + t.Errorf("expected 1 check, got %d", len(cluster.Snapshot().Checks)) + } +} + +func TestApplyRemoveCheck(t *testing.T) { + r, cluster, _ := newReplicator(t, true, true) + _ = cluster.Mutate("self", func(c *config.ClusterConfig) error { + c.Checks = []config.Check{{ID: "c1", Name: "x"}, {ID: "c2", Name: "y"}} + return nil + }) + + target, _ := json.Marshal("x") + ver, err := r.LocalMutate(context.Background(), transport.MutationRemoveCheck, json.RawMessage(target)) + if err != nil { + t.Fatal(err) + } + if ver < 2 { + t.Errorf("version did not advance: %d", ver) + } + cs := cluster.Snapshot().Checks + if len(cs) != 1 || cs[0].ID != "c2" { + t.Errorf("expected only c2 remaining, got %+v", cs) + } +} + +func TestApplyAddAndRemoveAlertAndPeer(t *testing.T) { + r, cluster, _ := newReplicator(t, true, true) + + alert, _ := json.Marshal(config.Alert{ID: "a1", Name: "notify", Type: config.AlertDiscord}) + if _, err := r.LocalMutate(context.Background(), transport.MutationAddAlert, json.RawMessage(alert)); err != nil { + t.Fatal(err) + } + + peer, _ := json.Marshal(config.PeerInfo{NodeID: "p1", Advertise: "10.0.0.1:9001", Fingerprint: "fp"}) + if _, err := r.LocalMutate(context.Background(), transport.MutationAddPeer, json.RawMessage(peer)); err != nil { + t.Fatal(err) + } + + snap := cluster.Snapshot() + if len(snap.Alerts) != 1 || len(snap.Peers) != 1 { + t.Fatalf("missing entries: %+v", snap) + } + + target, _ := json.Marshal("notify") + if _, err := r.LocalMutate(context.Background(), transport.MutationRemoveAlert, json.RawMessage(target)); err != nil { + t.Fatal(err) + } + target, _ = json.Marshal("p1") + if _, err := r.LocalMutate(context.Background(), transport.MutationRemovePeer, json.RawMessage(target)); err != nil { + t.Fatal(err) + } + + snap = cluster.Snapshot() + if len(snap.Alerts) != 0 || len(snap.Peers) != 0 { + t.Errorf("entries not removed: %+v", snap) + } +} + +func TestMutateRequiresQuorum(t *testing.T) { + r, _, _ := newReplicator(t, true, false) + _, err := r.LocalMutate(context.Background(), transport.MutationAddCheck, json.RawMessage("{}")) + if err == nil { + t.Error("expected quorum-required error") + } +} + +func TestHandleApplyClusterCfgGatesOnVersion(t *testing.T) { + r, cluster, _ := newReplicator(t, false, true) + // Push local version to 7 directly via Replace (Mutate would + // implicitly bump to 8 and confuse the test cases below). + if _, err := cluster.Replace(&config.ClusterConfig{Version: 7}); err != nil { + t.Fatal(err) + } + + if resp := r.HandleApplyClusterCfg(transport.ApplyClusterCfgRequest{ + Config: &config.ClusterConfig{Version: 6}, + }); resp.Applied { + t.Error("older snapshot was applied") + } + if resp := r.HandleApplyClusterCfg(transport.ApplyClusterCfgRequest{ + Config: &config.ClusterConfig{Version: 7}, + }); resp.Applied { + t.Error("same-version snapshot was applied") + } + + resp := r.HandleApplyClusterCfg(transport.ApplyClusterCfgRequest{ + Config: &config.ClusterConfig{Version: 8, Checks: []config.Check{{ID: "n"}}}, + }) + if !resp.Applied { + t.Error("newer snapshot was rejected") + } + if cluster.Snapshot().Version != 8 { + t.Errorf("local version did not advance: %d", cluster.Snapshot().Version) + } +} + +func TestHandleProposeMutationRejectsNonMaster(t *testing.T) { + r, _, _ := newReplicator(t, false, true) + resp := r.HandleProposeMutation(context.Background(), transport.ProposeMutationRequest{ + FromNodeID: "follower", + Kind: transport.MutationAddCheck, + Payload: json.RawMessage(`{}`), + }) + if resp.Error == "" { + t.Error("follower accepted a proposal") + } +} diff --git a/internal/transport/frame_test.go b/internal/transport/frame_test.go new file mode 100644 index 0000000..3cb973c --- /dev/null +++ b/internal/transport/frame_test.go @@ -0,0 +1,76 @@ +package transport + +import ( + "bytes" + "io" + "testing" +) + +func TestFrameRoundtrip(t *testing.T) { + cases := [][]byte{ + nil, + {}, + []byte("hello"), + bytes.Repeat([]byte("x"), 1<<14), + } + for _, payload := range cases { + var buf bytes.Buffer + if err := writeFrame(&buf, payload); err != nil { + t.Fatalf("write %d bytes: %v", len(payload), err) + } + out, err := readFrame(&buf) + if err != nil { + t.Fatalf("read %d bytes: %v", len(payload), err) + } + if !bytes.Equal(out, payload) { + t.Errorf("roundtrip lost data for %d bytes", len(payload)) + } + } +} + +func TestFrameRejectsOversize(t *testing.T) { + var buf bytes.Buffer + if err := writeFrame(&buf, bytes.Repeat([]byte{0}, MaxFrameSize+1)); err == nil { + t.Error("oversized write was accepted") + } +} + +func TestFrameRejectsOversizeOnRead(t *testing.T) { + // hand-crafted header announcing a size beyond the cap + var buf bytes.Buffer + buf.Write([]byte{0xFF, 0xFF, 0xFF, 0xFF}) // ~4GiB + if _, err := readFrame(&buf); err == nil { + t.Error("oversized read was accepted") + } +} + +func TestFrameReportsShortRead(t *testing.T) { + var buf bytes.Buffer + // header says 10 bytes, body only 3 + buf.Write([]byte{0, 0, 0, 10}) + buf.WriteString("abc") + if _, err := readFrame(&buf); err == nil { + t.Error("short body did not error") + } +} + +func TestMultipleFramesInOneStream(t *testing.T) { + var buf bytes.Buffer + for _, s := range []string{"first", "second", "third"} { + if err := writeFrame(&buf, []byte(s)); err != nil { + t.Fatal(err) + } + } + for _, want := range []string{"first", "second", "third"} { + got, err := readFrame(&buf) + if err != nil { + t.Fatal(err) + } + if string(got) != want { + t.Errorf("got %q want %q", got, want) + } + } + if _, err := readFrame(&buf); err != io.EOF { + t.Errorf("expected EOF, got %v", err) + } +} diff --git a/internal/transport/rpc.go b/internal/transport/rpc.go index 2170195..ab8138d 100644 --- a/internal/transport/rpc.go +++ b/internal/transport/rpc.go @@ -3,7 +3,6 @@ package transport import ( "context" "crypto/tls" - "crypto/x509" "encoding/json" "errors" "fmt" @@ -322,8 +321,3 @@ func peerNodeIDFromConnState(cs tls.ConnectionState) string { } return cs.PeerCertificates[0].Subject.CommonName } - -// fingerprintOf is a small local mirror to keep this file independent -// of the crypto package's import path at link time; we recompute the -// SPKI hash here. Defined in tofu.go. -var _ = (*x509.Certificate)(nil) diff --git a/internal/transport/rpc_test.go b/internal/transport/rpc_test.go new file mode 100644 index 0000000..6dd2e9e --- /dev/null +++ b/internal/transport/rpc_test.go @@ -0,0 +1,186 @@ +package transport + +import ( + "context" + "encoding/json" + "errors" + "net" + "testing" + "time" + + "github.com/jasper/quptime/internal/crypto" + "github.com/jasper/quptime/internal/trust" +) + +// testNode bundles everything one side of the handshake needs. +type testNode struct { + id string + dir string + assets *TLSAssets + fp string +} + +// makeNode builds keys + cert + an empty trust store rooted at dir. +// After every disk-touching trust operation the caller must ensure +// QUPTIME_DIR points back at this node's dir. +func makeNode(t *testing.T, dir, id string) *testNode { + t.Helper() + t.Setenv("QUPTIME_DIR", dir) + priv, err := crypto.GenerateKeyPair(id) + if err != nil { + t.Fatal(err) + } + certPEM, err := crypto.LoadCertPEM() + if err != nil { + t.Fatal(err) + } + fp, err := crypto.FingerprintFromCertPEM(certPEM) + if err != nil { + t.Fatal(err) + } + store, err := trust.Load() + if err != nil { + t.Fatal(err) + } + return &testNode{ + id: id, + dir: dir, + assets: &TLSAssets{Cert: certPEM, Key: priv, Trust: store}, + fp: fp, + } +} + +func (n *testNode) trust(t *testing.T, other *testNode, addr string) { + t.Helper() + t.Setenv("QUPTIME_DIR", n.dir) + if err := n.assets.Trust.Add(trust.Entry{ + NodeID: other.id, Address: addr, Fingerprint: other.fp, + }); err != nil { + t.Fatal(err) + } +} + +func TestRPCRoundtrip(t *testing.T) { + a := makeNode(t, t.TempDir(), "node-a") + b := makeNode(t, t.TempDir(), "node-b") + + // pre-pick a free port; brief race window is acceptable for tests + tmpLn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + addr := tmpLn.Addr().String() + tmpLn.Close() + + a.trust(t, b, addr) + b.trust(t, a, addr) + + srv := NewServer(a.assets) + srv.Handle("Echo", func(_ context.Context, peer string, payload json.RawMessage) (any, error) { + var s string + if err := json.Unmarshal(payload, &s); err != nil { + return nil, err + } + if peer != b.id { + return nil, errors.New("unexpected peer id: " + peer) + } + return s + " ack", nil + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + done := make(chan error, 1) + go func() { done <- srv.Serve(ctx, addr) }() + defer srv.Stop() + + if !waitForDial(addr, 2*time.Second) { + t.Fatal("server did not start listening in time") + } + + cli := NewClient(b.assets) + defer cli.Close() + + callCtx, callCancel := context.WithTimeout(ctx, 5*time.Second) + defer callCancel() + var got string + if err := cli.Call(callCtx, a.id, addr, "Echo", "hello", &got); err != nil { + t.Fatalf("Call: %v", err) + } + if got != "hello ack" { + t.Errorf("got %q want %q", got, "hello ack") + } +} + +func TestRPCUnknownMethod(t *testing.T) { + a := makeNode(t, t.TempDir(), "node-a") + b := makeNode(t, t.TempDir(), "node-b") + + tmpLn, _ := net.Listen("tcp", "127.0.0.1:0") + addr := tmpLn.Addr().String() + tmpLn.Close() + + a.trust(t, b, addr) + b.trust(t, a, addr) + + srv := NewServer(a.assets) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go srv.Serve(ctx, addr) + defer srv.Stop() + if !waitForDial(addr, 2*time.Second) { + t.Fatal("server not up") + } + + cli := NewClient(b.assets) + defer cli.Close() + err := cli.Call(ctx, a.id, addr, "DoesNotExist", nil, nil) + if err == nil { + t.Fatal("expected error for unknown method") + } +} + +func TestRPCRejectsUntrustedPeer(t *testing.T) { + a := makeNode(t, t.TempDir(), "node-a") + b := makeNode(t, t.TempDir(), "node-b") + + tmpLn, _ := net.Listen("tcp", "127.0.0.1:0") + addr := tmpLn.Addr().String() + tmpLn.Close() + + // Deliberately omit b.trust(...) on the server side: b is unknown to a. + t.Setenv("QUPTIME_DIR", b.dir) + _ = b.assets.Trust.Add(trust.Entry{NodeID: a.id, Address: addr, Fingerprint: a.fp}) + + srv := NewServer(a.assets) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go srv.Serve(ctx, addr) + defer srv.Stop() + if !waitForDial(addr, 2*time.Second) { + t.Fatal("server not up") + } + + cli := NewClient(b.assets) + defer cli.Close() + + callCtx, callCancel := context.WithTimeout(ctx, 2*time.Second) + defer callCancel() + if err := cli.Call(callCtx, a.id, addr, "Ping", nil, nil); err == nil { + t.Error("untrusted client was admitted") + } +} + +// waitForDial polls a TCP listener until it accepts a plain TCP +// connection, signalling that Serve has begun listening. +func waitForDial(addr string, max time.Duration) bool { + deadline := time.Now().Add(max) + for time.Now().Before(deadline) { + c, err := net.DialTimeout("tcp", addr, 200*time.Millisecond) + if err == nil { + _ = c.Close() + return true + } + time.Sleep(20 * time.Millisecond) + } + return false +} diff --git a/internal/trust/store_test.go b/internal/trust/store_test.go new file mode 100644 index 0000000..ece2cd6 --- /dev/null +++ b/internal/trust/store_test.go @@ -0,0 +1,97 @@ +package trust + +import ( + "crypto/x509" + "encoding/pem" + "testing" + + "github.com/jasper/quptime/internal/crypto" +) + +func TestRoundtripAndLookup(t *testing.T) { + t.Setenv("QUPTIME_DIR", t.TempDir()) + s, err := Load() + if err != nil { + t.Fatal(err) + } + if len(s.List()) != 0 { + t.Error("expected empty store") + } + + if err := s.Add(Entry{NodeID: "n1", Address: "10.0.0.1:9001", Fingerprint: "sha256:abc"}); err != nil { + t.Fatal(err) + } + if err := s.Add(Entry{NodeID: "n2", Address: "10.0.0.2:9001", Fingerprint: "sha256:def"}); err != nil { + t.Fatal(err) + } + + s2, err := Load() + if err != nil { + t.Fatal(err) + } + if len(s2.List()) != 2 { + t.Errorf("got %d entries after reload", len(s2.List())) + } + if e, ok := s2.Get("n1"); !ok || e.Fingerprint != "sha256:abc" { + t.Errorf("Get(n1) = %+v ok=%v", e, ok) + } + if e, ok := s2.LookupByFingerprint("sha256:def"); !ok || e.NodeID != "n2" { + t.Errorf("LookupByFingerprint = %+v ok=%v", e, ok) + } + + removed, err := s2.Remove("n1") + if err != nil || !removed { + t.Fatalf("Remove returned %v err=%v", removed, err) + } + if _, ok := s2.Get("n1"); ok { + t.Error("entry still present after Remove") + } + + s3, _ := Load() + if _, ok := s3.Get("n1"); ok { + t.Error("Remove did not persist") + } +} + +func TestAddRequiresIDAndFingerprint(t *testing.T) { + t.Setenv("QUPTIME_DIR", t.TempDir()) + s, _ := Load() + if err := s.Add(Entry{NodeID: "n1"}); err == nil { + t.Error("missing fingerprint should error") + } + if err := s.Add(Entry{Fingerprint: "fp"}); err == nil { + t.Error("missing node id should error") + } +} + +func TestVerifyPeerCertPinsFingerprint(t *testing.T) { + t.Setenv("QUPTIME_DIR", t.TempDir()) + if _, err := crypto.GenerateKeyPair("peer-1"); err != nil { + t.Fatal(err) + } + certPEM, _ := crypto.LoadCertPEM() + block, _ := pem.Decode(certPEM) + cert, _ := x509.ParseCertificate(block.Bytes) + fp := crypto.Fingerprint(cert) + + s, _ := Load() + + // Untrusted: should reject. + if err := s.VerifyPeerCert([][]byte{cert.Raw}, nil); err == nil { + t.Error("untrusted cert was accepted") + } + + if err := s.Add(Entry{NodeID: "peer-1", Fingerprint: fp}); err != nil { + t.Fatal(err) + } + + // Now trusted. + if err := s.VerifyPeerCert([][]byte{cert.Raw}, nil); err != nil { + t.Errorf("trusted cert rejected: %v", err) + } + + // No certs presented at all should error. + if err := s.VerifyPeerCert(nil, nil); err == nil { + t.Error("empty cert chain was accepted") + } +}