From 46abc09b11c03f725a255d46fb8b6861c1e5c302 Mon Sep 17 00:00:00 2001 From: Axodouble Date: Tue, 12 May 2026 07:51:20 +0000 Subject: [PATCH] Added secrets to setting up the cluster, updated default port, and fixed some issues when joining nodes async --- README.md | 47 +++++++++++++++++------ install.sh | 24 ++++++++++++ internal/cli/init.go | 55 +++++++++++++++++++++++---- internal/cli/status.go | 9 ++++- internal/config/cluster.go | 48 ++++++++++++++++++++--- internal/config/node.go | 11 +++++- internal/config/node_test.go | 16 ++++---- internal/daemon/control.go | 14 ++++--- internal/daemon/daemon.go | 34 +++++++++++++++++ internal/daemon/handlers.go | 13 ++++--- internal/quorum/manager.go | 51 +++++++++++++++++++++---- internal/quorum/manager_test.go | 20 ++++++++++ internal/replicate/replicator_test.go | 2 +- internal/transport/messages.go | 27 ++++++++----- internal/trust/store_test.go | 4 +- 15 files changed, 308 insertions(+), 67 deletions(-) create mode 100644 install.sh diff --git a/README.md b/README.md index 1d69939..d7fd854 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ on truth. If one of them loses its uplink, the rest keep alerting. ``` +-------------- node A ---------------+ | qu serve | - | ├─ transport server (mTLS :9001) | + | ├─ transport server (mTLS :9901) | | ├─ quorum manager (heartbeats) | | ├─ replicator (cluster.yaml) | | ├─ scheduler (HTTP/TCP/ICMP) | <─── probes targets @@ -73,24 +73,47 @@ git push --tags ## Set up a 3-node cluster -On each host: +On the **first host**: ```sh -# 1. Generate identity + RSA-3072 keypair + self-signed cert. -qu init --advertise :9001 +qu init --advertise alpha.example.com:9901 +``` -# 2. Start the daemon (foreground; wire it into systemd for prod). +That prints a random cluster secret. Copy it. + +On every **other host**, pass that secret via `--secret`: + +```sh +qu init --advertise bravo.example.com:9901 --secret +qu init --advertise charlie.example.com:9901 --secret +``` + +Without the matching secret a node cannot join, so random hosts that +can reach :9901 are safely ignored. + +Start the daemon on every host (foreground; wire into systemd for prod): + +```sh qu serve ``` -Pick one node and tell it about the other two. The CLI prints the -remote fingerprint and asks for confirmation, SSH-style: +Then on one node — usually `alpha` — invite the others. The CLI prints +each remote's fingerprint and asks for confirmation SSH-style: ```sh -qu node add bravo.example.com:9001 -qu node add charlie.example.com:9001 +qu node add bravo.example.com:9901 +qu node add charlie.example.com:9901 ``` +After the first invite, give it a few seconds for heartbeats to bring +the new peer into the live set before inviting the next one — otherwise +the local node's "needs ≥2 live to mutate" check will reject the +second add. + +You only need to invite from one node. Peer certs ride along with the +replicated `cluster.yaml`, so every peer auto-trusts every other peer +without `N×(N-1)` invites. + That's it — the master broadcasts the new cluster config to every trusting peer. `qu status` from any node should now show all three: @@ -103,9 +126,9 @@ config ver 4 PEERS NODE_ID ADVERTISE LIVE LAST_SEEN -a7f3... alpha.example.com:9001 true 2026-05-12T15:01:32Z -b21c... bravo.example.com:9001 true 2026-05-12T15:01:32Z -c0d4... charlie.example.com:9001 true 2026-05-12T15:01:32Z +a7f3... alpha.example.com:9901 true 2026-05-12T15:01:32Z +b21c... bravo.example.com:9901 true 2026-05-12T15:01:32Z +c0d4... charlie.example.com:9901 true 2026-05-12T15:01:32Z ``` ## Adding checks and alerts diff --git a/install.sh b/install.sh new file mode 100644 index 0000000..6fe9435 --- /dev/null +++ b/install.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +# Check if ~/.local/bin exists, if not, create it +if [ ! -d "$HOME/.local/bin" ]; then + mkdir -p "$HOME/.local/bin" +fi + +# Check if ~/.local/bin is in the PATH, if not, give the user a command to add it +if [[ ":$PATH:" != *":$HOME/.local/bin:"* ]]; then + echo "Please add the following line to your shell configuration file (e.g., ~/.bashrc, ~/.zshrc) to include ~/.local/bin in your PATH:" + echo 'export PATH="$HOME/.local/bin:$PATH"' + echo "After adding the line, please restart your terminal or run 'source ~/.bashrc' (or the appropriate command for your shell) to apply the changes." +fi + +# Download the binary from git.cer.sh/axodouble/quptime +# Check whether curl or wget is available +if command -v curl > /dev/null; then + curl -L -o "$HOME/.local/bin/quptime" "https://git.cer.sh/axodouble/quptime/-/raw/main/quptime" +elif command -v wget > /dev/null; then + wget -O "$HOME/.local/bin/quptime" "https://git.cer.sh/axodouble/quptime/-/raw/main/quptime" +else + echo "Error: Neither curl nor wget is installed. Please install one of these tools to download the quptime binary." + exit 1 +fi \ No newline at end of file diff --git a/internal/cli/init.go b/internal/cli/init.go index 59de50c..085d28c 100644 --- a/internal/cli/init.go +++ b/internal/cli/init.go @@ -1,6 +1,8 @@ package cli import ( + "crypto/rand" + "encoding/base64" "errors" "fmt" "os" @@ -16,6 +18,7 @@ func addInitCmd(root *cobra.Command) { var advertise string var bindAddr string var bindPort int + var clusterSecret string cmd := &cobra.Command{ Use: "init", @@ -23,6 +26,10 @@ func addInitCmd(root *cobra.Command) { Long: `Initialise a new qu node on this host: pick a UUID, generate an RSA keypair, write a default node.yaml, and prepare the trust store. +Pass --secret on every subsequent node so they share the same +cluster join secret. If --secret is omitted on the very first node, a +random secret is generated and printed for the operator to copy. + Idempotent in one direction only: existing key material is never overwritten. Re-run only after wiping the data directory.`, RunE: func(cmd *cobra.Command, args []string) error { @@ -32,12 +39,25 @@ overwritten. Re-run only after wiping the data directory.`, if _, err := os.Stat(config.NodeFilePath()); err == nil { return errors.New("node.yaml already exists in data dir — refusing to overwrite") } + + secret := clusterSecret + generated := false + if secret == "" { + s, err := generateSecret() + if err != nil { + return fmt.Errorf("generate cluster secret: %w", err) + } + secret = s + generated = true + } + nodeID := uuid.NewString() n := &config.NodeConfig{ - NodeID: nodeID, - BindAddr: bindAddr, - BindPort: bindPort, - Advertise: advertise, + NodeID: nodeID, + BindAddr: bindAddr, + BindPort: bindPort, + Advertise: advertise, + ClusterSecret: secret, } if err := n.Save(); err != nil { return fmt.Errorf("save node.yaml: %w", err) @@ -66,20 +86,39 @@ overwritten. Re-run only after wiping the data directory.`, NodeID: nodeID, Advertise: n.AdvertiseAddr(), Fingerprint: fp, + CertPEM: string(certPEM), }} return nil }); err != nil { return fmt.Errorf("seed cluster.yaml: %w", err) } - fmt.Fprintf(cmd.OutOrStdout(), "initialised node %s\n", nodeID) - fmt.Fprintf(cmd.OutOrStdout(), "data dir: %s\n", config.DataDir()) - fmt.Fprintf(cmd.OutOrStdout(), "advertise: %s\n", n.AdvertiseAddr()) + out := cmd.OutOrStdout() + fmt.Fprintf(out, "initialised node %s\n", nodeID) + fmt.Fprintf(out, "data dir: %s\n", config.DataDir()) + fmt.Fprintf(out, "advertise: %s\n", n.AdvertiseAddr()) + if generated { + fmt.Fprintln(out) + fmt.Fprintln(out, "cluster secret (copy to every other node via --secret):") + fmt.Fprintln(out, " "+secret) + } return nil }, } cmd.Flags().StringVar(&advertise, "advertise", "", "address peers should use to reach this node (host:port)") cmd.Flags().StringVar(&bindAddr, "bind", "0.0.0.0", "listen address for inter-node traffic") - cmd.Flags().IntVar(&bindPort, "port", 9001, "listen port for inter-node traffic") + cmd.Flags().IntVar(&bindPort, "port", 9901, "listen port for inter-node traffic") + cmd.Flags().StringVar(&clusterSecret, "secret", "", "shared cluster join secret (omit on the first node to auto-generate)") root.AddCommand(cmd) } + +// generateSecret produces 32 bytes of crypto-random data and returns +// it base64-encoded. Long enough that brute force isn't a concern; +// short enough that operators can copy-paste it without pagination. +func generateSecret() (string, error) { + b := make([]byte, 32) + if _, err := rand.Read(b); err != nil { + return "", err + } + return base64.RawURLEncoding.EncodeToString(b), nil +} diff --git a/internal/cli/status.go b/internal/cli/status.go index e547eb7..2553d1b 100644 --- a/internal/cli/status.go +++ b/internal/cli/status.go @@ -47,15 +47,20 @@ func runStatusPrint(ctx context.Context, cmd *cobra.Command, peersOnly bool) err fmt.Fprintln(out) fmt.Fprintln(out, "PEERS") tw := tabwriter.NewWriter(out, 0, 0, 2, ' ', 0) - fmt.Fprintln(tw, "NODE_ID\tADVERTISE\tLIVE\tLAST_SEEN") + fmt.Fprintln(tw, "\tNODE_ID\tADVERTISE\tLIVE\tLAST_SEEN") for _, p := range st.Peers { lastSeen := "-" if !p.LastSeen.IsZero() { lastSeen = p.LastSeen.Format(time.RFC3339) } - fmt.Fprintf(tw, "%s\t%s\t%v\t%s\n", p.NodeID, p.Advertise, p.Live, lastSeen) + marker := " " + if p.NodeID == st.NodeID { + marker = "*" + } + fmt.Fprintf(tw, "%s\t%s\t%s\t%v\t%s\n", marker, p.NodeID, p.Advertise, p.Live, lastSeen) } tw.Flush() + fmt.Fprintln(out, "(* = this node)") if peersOnly { return nil diff --git a/internal/config/cluster.go b/internal/config/cluster.go index cff6648..8608c15 100644 --- a/internal/config/cluster.go +++ b/internal/config/cluster.go @@ -10,11 +10,18 @@ import ( ) // PeerInfo identifies a cluster member as known to all peers. -// (Trust material lives in trust.yaml; this struct stays portable.) +// +// CertPEM rides along so the daemon can populate trust.yaml when a +// new node joins: a follower receiving an updated cluster.yaml from +// the master trusts the master, and therefore trusts the peer +// certificates it forwards. Without this, mTLS between new and old +// peers would never succeed because neither would have the other in +// its trust store. type PeerInfo struct { NodeID string `yaml:"node_id"` Advertise string `yaml:"advertise"` Fingerprint string `yaml:"fingerprint"` + CertPEM string `yaml:"cert_pem,omitempty"` } // CheckType enumerates the supported probe kinds. @@ -83,7 +90,27 @@ type ClusterConfig struct { Checks []Check `yaml:"checks"` Alerts []Alert `yaml:"alerts"` - mu sync.RWMutex `yaml:"-"` + mu sync.RWMutex `yaml:"-"` + onChange []func() // fired after any successful Mutate/Replace +} + +// OnChange registers a callback fired after every successful Mutate +// or Replace. Callbacks run synchronously on the mutating goroutine +// AFTER the lock is released — they may safely call back into the +// config to read snapshots. +func (c *ClusterConfig) OnChange(fn func()) { + c.mu.Lock() + c.onChange = append(c.onChange, fn) + c.mu.Unlock() +} + +func (c *ClusterConfig) fireOnChange() { + c.mu.RLock() + cbs := append([]func(){}, c.onChange...) + c.mu.RUnlock() + for _, fn := range cbs { + fn() + } } // LoadClusterConfig reads cluster.yaml. A missing file returns an @@ -136,8 +163,8 @@ func (c *ClusterConfig) Snapshot() *ClusterConfig { // success, and writes the file. Only the master should call this. func (c *ClusterConfig) Mutate(byNode string, fn func(*ClusterConfig) error) error { c.mu.Lock() - defer c.mu.Unlock() if err := fn(c); err != nil { + c.mu.Unlock() return err } c.Version++ @@ -145,9 +172,16 @@ func (c *ClusterConfig) Mutate(byNode string, fn func(*ClusterConfig) error) err c.UpdatedBy = byNode out, err := yaml.Marshal(c) if err != nil { + c.mu.Unlock() return err } - return AtomicWrite(ClusterFilePath(), out, 0o600) + if err := AtomicWrite(ClusterFilePath(), out, 0o600); err != nil { + c.mu.Unlock() + return err + } + c.mu.Unlock() + c.fireOnChange() + return nil } // Replace overwrites the local config with an incoming snapshot if @@ -155,8 +189,8 @@ func (c *ClusterConfig) Mutate(byNode string, fn func(*ClusterConfig) error) err // applied. func (c *ClusterConfig) Replace(incoming *ClusterConfig) (bool, error) { c.mu.Lock() - defer c.mu.Unlock() if incoming.Version <= c.Version { + c.mu.Unlock() return false, nil } c.Version = incoming.Version @@ -167,11 +201,15 @@ func (c *ClusterConfig) Replace(incoming *ClusterConfig) (bool, error) { c.Alerts = append([]Alert(nil), incoming.Alerts...) out, err := yaml.Marshal(c) if err != nil { + c.mu.Unlock() return false, err } if err := AtomicWrite(ClusterFilePath(), out, 0o600); err != nil { + c.mu.Unlock() return false, err } + c.mu.Unlock() + c.fireOnChange() return true, nil } diff --git a/internal/config/node.go b/internal/config/node.go index 3c4c27e..41993f5 100644 --- a/internal/config/node.go +++ b/internal/config/node.go @@ -17,12 +17,19 @@ type NodeConfig struct { // traffic. Defaults to 0.0.0.0. BindAddr string `yaml:"bind_addr"` - // BindPort is the port the daemon listens on. Default 9001. + // BindPort is the port the daemon listens on. Default 9901. BindPort int `yaml:"bind_port"` // Advertise is the address other nodes use to reach us. May differ // from BindAddr when behind NAT. Set explicitly via `qu init --advertise`. Advertise string `yaml:"advertise"` + + // ClusterSecret is the pre-shared secret every node in the cluster + // must present during the Join RPC. Without it any operator who + // can reach :9901 could enrol themselves into the cluster, so we + // require an out-of-band copy at `qu init` time. Stored locally + // only, never replicated. + ClusterSecret string `yaml:"cluster_secret"` } // AdvertiseAddr returns the address peers should dial. Falls back to @@ -49,7 +56,7 @@ func LoadNodeConfig() (*NodeConfig, error) { return nil, fmt.Errorf("parse node.yaml: %w", err) } if cfg.BindPort == 0 { - cfg.BindPort = 9001 + cfg.BindPort = 9901 } if cfg.BindAddr == "" { cfg.BindAddr = "0.0.0.0" diff --git a/internal/config/node_test.go b/internal/config/node_test.go index 4118ebf..9c37fad 100644 --- a/internal/config/node_test.go +++ b/internal/config/node_test.go @@ -8,11 +8,11 @@ func TestAdvertiseAddrFallback(t *testing.T) { cfg NodeConfig want string }{ - {"explicit advertise wins", NodeConfig{Advertise: "host:1234", BindAddr: "0.0.0.0", BindPort: 9001}, "host:1234"}, - {"empty bind falls back to loopback", NodeConfig{BindPort: 9001}, "127.0.0.1:9001"}, - {"wildcard bind falls back to loopback", NodeConfig{BindAddr: "0.0.0.0", BindPort: 9001}, "127.0.0.1:9001"}, - {"ipv6 wildcard falls back to loopback", NodeConfig{BindAddr: "::", BindPort: 9001}, "127.0.0.1:9001"}, - {"specific bind preserved", NodeConfig{BindAddr: "10.0.0.1", BindPort: 9001}, "10.0.0.1:9001"}, + {"explicit advertise wins", NodeConfig{Advertise: "host:1234", BindAddr: "0.0.0.0", BindPort: 9901}, "host:1234"}, + {"empty bind falls back to loopback", NodeConfig{BindPort: 9901}, "127.0.0.1:9901"}, + {"wildcard bind falls back to loopback", NodeConfig{BindAddr: "0.0.0.0", BindPort: 9901}, "127.0.0.1:9901"}, + {"ipv6 wildcard falls back to loopback", NodeConfig{BindAddr: "::", BindPort: 9901}, "127.0.0.1:9901"}, + {"specific bind preserved", NodeConfig{BindAddr: "10.0.0.1", BindPort: 9901}, "10.0.0.1:9901"}, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { @@ -25,7 +25,7 @@ func TestAdvertiseAddrFallback(t *testing.T) { func TestNodeConfigRoundtrip(t *testing.T) { t.Setenv("QUPTIME_DIR", t.TempDir()) - n := &NodeConfig{NodeID: "abc", BindAddr: "127.0.0.1", BindPort: 9001, Advertise: "10.0.0.1:9001"} + n := &NodeConfig{NodeID: "abc", BindAddr: "127.0.0.1", BindPort: 9901, Advertise: "10.0.0.1:9901"} if err := n.Save(); err != nil { t.Fatal(err) } @@ -49,8 +49,8 @@ func TestLoadNodeConfigAppliesDefaults(t *testing.T) { if err != nil { t.Fatal(err) } - if loaded.BindPort != 9001 { - t.Errorf("BindPort=%d want 9001", loaded.BindPort) + if loaded.BindPort != 9901 { + t.Errorf("BindPort=%d want 9901", loaded.BindPort) } if loaded.BindAddr != "0.0.0.0" { t.Errorf("BindAddr=%q want 0.0.0.0", loaded.BindAddr) diff --git a/internal/daemon/control.go b/internal/daemon/control.go index 4b029e6..7c6f515 100644 --- a/internal/daemon/control.go +++ b/internal/daemon/control.go @@ -305,10 +305,11 @@ func (d *Daemon) nodeAdd(ctx context.Context, body NodeAddBody) (NodeAddResult, return NodeAddResult{}, fmt.Errorf("own fingerprint: %w", err) } joinReq := transport.JoinRequest{ - NodeID: d.node.NodeID, - Advertise: d.node.AdvertiseAddr(), - Fingerprint: myFP, - CertPEM: string(d.assets.Cert), + NodeID: d.node.NodeID, + Advertise: d.node.AdvertiseAddr(), + Fingerprint: myFP, + CertPEM: string(d.assets.Cert), + ClusterSecret: d.node.ClusterSecret, } var joinResp transport.JoinResponse if err := d.client.Call(ctx, peerID, body.Address, transport.MethodJoin, joinReq, &joinResp); err != nil { @@ -319,11 +320,14 @@ func (d *Daemon) nodeAdd(ctx context.Context, body NodeAddBody) (NodeAddResult, } // Propose the cluster-config addition. Routed to master via the - // replicator; if we are the master, applied directly. + // replicator; if we are the master, applied directly. Including + // CertPEM lets other peers auto-trust this node once the new + // cluster.yaml reaches them. peerInfo := config.PeerInfo{ NodeID: peerID, Advertise: body.Address, Fingerprint: sample.Fingerprint, + CertPEM: string(sample.CertPEM), } ver, err := d.replicator.LocalMutate(ctx, transport.MutationAddPeer, peerInfo) if err != nil { diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index 48ed182..e1abe0b 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -103,6 +103,7 @@ func New(logger *log.Logger) (*Daemon, error) { } d.quorum = quorum.New(node.NodeID, cluster, client) + d.quorum.SetSelfAdvertise(node.AdvertiseAddr()) d.replicator = replicate.New(node.NodeID, cluster, client, d.quorum) d.aggregator = checks.NewAggregator(cluster, nil) d.dispatcher = alerts.New(cluster, node.NodeID, logger) @@ -125,9 +126,42 @@ func New(logger *log.Logger) (*Daemon, error) { d.scheduler = checks.NewScheduler(cluster, &sink{d: d}) d.control = newControlServer(d) d.registerHandlers() + + // Whenever cluster.yaml changes, mirror peer certs into the local + // trust store so this node can mTLS to every other peer — even + // peers it was never invited by directly. + cluster.OnChange(d.syncTrustFromCluster) + d.syncTrustFromCluster() + return d, nil } +// syncTrustFromCluster makes sure every peer listed in cluster.yaml +// has a corresponding trust entry. Trust entries are only added (not +// removed) here — `qu node remove` is the explicit eviction path. +func (d *Daemon) syncTrustFromCluster() { + snap := d.cluster.Snapshot() + for _, p := range snap.Peers { + if p.NodeID == "" || p.NodeID == d.node.NodeID { + continue + } + if p.Fingerprint == "" || p.CertPEM == "" { + continue // pre-1.0 peer entry without cert material — skip + } + if existing, ok := d.trust.Get(p.NodeID); ok && existing.Fingerprint == p.Fingerprint { + continue + } + if err := d.trust.Add(trust.Entry{ + NodeID: p.NodeID, + Address: p.Advertise, + Fingerprint: p.Fingerprint, + CertPEM: p.CertPEM, + }); err != nil { + d.logger.Printf("trust sync: %s: %v", p.NodeID, err) + } + } +} + // Run binds the inter-node listener and the local control socket, // starts the quorum loop and the scheduler, and blocks until ctx is // cancelled. diff --git a/internal/daemon/handlers.go b/internal/daemon/handlers.go index 6e51373..044ec2b 100644 --- a/internal/daemon/handlers.go +++ b/internal/daemon/handlers.go @@ -2,6 +2,7 @@ package daemon import ( "context" + "crypto/subtle" "encoding/json" "time" @@ -38,6 +39,13 @@ func (d *Daemon) registerHandlers() { if err := json.Unmarshal(raw, &req); err != nil { return transport.JoinResponse{Error: err.Error()}, nil } + // Constant-time secret check: every node in the cluster must + // present the same shared secret. This is the only barrier + // stopping a stranger who can reach :9901 from enrolling + // themselves with their own fresh key. + if subtle.ConstantTimeCompare([]byte(req.ClusterSecret), []byte(d.node.ClusterSecret)) != 1 { + return transport.JoinResponse{Error: "cluster secret mismatch"}, nil + } fp, err := crypto.FingerprintFromCertPEM([]byte(req.CertPEM)) if err != nil { return transport.JoinResponse{Error: "parse cert: " + err.Error()}, nil @@ -45,11 +53,6 @@ func (d *Daemon) registerHandlers() { if fp != req.Fingerprint { return transport.JoinResponse{Error: "fingerprint mismatch"}, nil } - // Outbound join (the proposing node already accepted our cert - // out of band). Symmetric trust is required for mTLS to work, - // so we accept the join automatically. Operators who need - // stricter onboarding can disable the listener and use the - // CLI flow exclusively. if err := d.trust.Add(trust.Entry{ NodeID: req.NodeID, Address: req.Advertise, diff --git a/internal/quorum/manager.go b/internal/quorum/manager.go index e2ee51f..66e6786 100644 --- a/internal/quorum/manager.go +++ b/internal/quorum/manager.go @@ -43,9 +43,10 @@ type VersionObserver func(peerID, peerAddr string, peerVersion uint64) // Manager coordinates heartbeats and master election for one node. type Manager struct { - selfID string - cluster *config.ClusterConfig - client *transport.Client + selfID string + selfAdvertise string + cluster *config.ClusterConfig + client *transport.Client heartbeatInterval time.Duration deadAfter time.Duration @@ -74,6 +75,15 @@ func New(selfID string, cluster *config.ClusterConfig, client *transport.Client) } } +// SetSelfAdvertise records the address this node advertises to peers. +// It's piggy-backed on every outbound heartbeat so the recipient can +// reach us even before we appear in their cluster.yaml. +func (m *Manager) SetSelfAdvertise(addr string) { + m.mu.Lock() + m.selfAdvertise = addr + m.mu.Unlock() +} + // SetVersionObserver registers a callback fired when a peer reports a // higher cluster-config version than ours. func (m *Manager) SetVersionObserver(fn VersionObserver) { @@ -104,15 +114,24 @@ func (m *Manager) Start(ctx context.Context) { func (m *Manager) HandleHeartbeat(req transport.HeartbeatRequest) transport.HeartbeatResponse { if req.FromNodeID != "" && req.FromNodeID != m.selfID { m.markLive(req.FromNodeID) + if req.Advertise != "" { + m.mu.Lock() + m.addrOf[req.FromNodeID] = req.Advertise + m.mu.Unlock() + } m.maybeNotifyVersion(req.FromNodeID, req.Version) } m.recomputeMaster() - v := m.cluster.Snapshot().Version + snap := m.cluster.Snapshot() + m.mu.RLock() + selfAdv := m.selfAdvertise + m.mu.RUnlock() return transport.HeartbeatResponse{ - NodeID: m.selfID, - Term: m.Term(), - MasterID: m.Master(), - Version: v, + NodeID: m.selfID, + Advertise: selfAdv, + Term: m.Term(), + MasterID: m.Master(), + Version: snap.Version, } } @@ -183,6 +202,9 @@ func (m *Manager) tick(ctx context.Context) { } currentMaster := m.Master() + m.mu.RLock() + selfAdv := m.selfAdvertise + m.mu.RUnlock() for _, p := range snap.Peers { if p.NodeID == m.selfID || p.NodeID == "" || p.Advertise == "" { continue @@ -194,6 +216,7 @@ func (m *Manager) tick(ctx context.Context) { defer cancel() req := transport.HeartbeatRequest{ FromNodeID: m.selfID, + Advertise: selfAdv, Term: m.Term(), MasterID: currentMaster, Version: snap.Version, @@ -204,6 +227,11 @@ func (m *Manager) tick(ctx context.Context) { return } m.markLive(peerID) + if resp.Advertise != "" { + m.mu.Lock() + m.addrOf[peerID] = resp.Advertise + m.mu.Unlock() + } m.maybeNotifyVersion(peerID, resp.Version) }(peerID, addr) } @@ -229,6 +257,13 @@ func (m *Manager) maybeNotifyVersion(peerID string, peerVer uint64) { m.mu.RLock() addr := m.addrOf[peerID] m.mu.RUnlock() + // Without an address we can't pull — silently wait for a + // later heartbeat (or for the peer to land in cluster.yaml) to + // supply one. Firing the observer with addr == "" would just + // produce log spam from the replicator. + if addr == "" { + return + } m.observer(peerID, addr, peerVer) } diff --git a/internal/quorum/manager_test.go b/internal/quorum/manager_test.go index be4616e..833e503 100644 --- a/internal/quorum/manager_test.go +++ b/internal/quorum/manager_test.go @@ -134,6 +134,12 @@ func TestVersionObserverFiresOnHigherVersion(t *testing.T) { notified.count++ }) + // Seed the address for "b" via an incoming heartbeat — the + // observer no-ops without one to avoid log spam. + m.HandleHeartbeat(transport.HeartbeatRequest{ + FromNodeID: "b", Advertise: "10.0.0.2:9901", Version: 2, + }) + m.maybeNotifyVersion("b", 5) if notified.count != 1 || notified.peerID != "b" || notified.peerVer != 5 { t.Errorf("expected observer fired with b=5, got %+v", notified) @@ -144,3 +150,17 @@ func TestVersionObserverFiresOnHigherVersion(t *testing.T) { t.Errorf("observer fired for stale version, count=%d", notified.count) } } + +func TestVersionObserverSkippedWithoutAddress(t *testing.T) { + cluster := &config.ClusterConfig{Version: 0} + m := New("a", cluster, nil) + + var fired int + m.SetVersionObserver(func(_, _ string, _ uint64) { fired++ }) + + // Peer "c" has never sent a heartbeat — no recorded address. + m.maybeNotifyVersion("c", 99) + if fired != 0 { + t.Errorf("observer fired without a known address: %d", fired) + } +} diff --git a/internal/replicate/replicator_test.go b/internal/replicate/replicator_test.go index b4124c1..a434182 100644 --- a/internal/replicate/replicator_test.go +++ b/internal/replicate/replicator_test.go @@ -88,7 +88,7 @@ func TestApplyAddAndRemoveAlertAndPeer(t *testing.T) { t.Fatal(err) } - peer, _ := json.Marshal(config.PeerInfo{NodeID: "p1", Advertise: "10.0.0.1:9001", Fingerprint: "fp"}) + peer, _ := json.Marshal(config.PeerInfo{NodeID: "p1", Advertise: "10.0.0.1:9901", Fingerprint: "fp"}) if _, err := r.LocalMutate(context.Background(), transport.MutationAddPeer, json.RawMessage(peer)); err != nil { t.Fatal(err) } diff --git a/internal/transport/messages.go b/internal/transport/messages.go index 158da13..710fe51 100644 --- a/internal/transport/messages.go +++ b/internal/transport/messages.go @@ -61,11 +61,16 @@ type WhoAmIResponse struct { // JoinRequest is sent by a node that has just learned the remote's // fingerprint out of band and wants the remote to record this node in // its own trust store too (so the relationship is symmetric). +// +// ClusterSecret is the pre-shared cluster join key. The recipient +// rejects the request unless it matches the locally-configured secret +// in constant time. type JoinRequest struct { - NodeID string `json:"node_id"` - Advertise string `json:"advertise"` - Fingerprint string `json:"fingerprint"` - CertPEM string `json:"cert_pem"` + NodeID string `json:"node_id"` + Advertise string `json:"advertise"` + Fingerprint string `json:"fingerprint"` + CertPEM string `json:"cert_pem"` + ClusterSecret string `json:"cluster_secret"` } // JoinResponse echoes a non-empty Error string when the remote refuses @@ -77,9 +82,12 @@ type JoinResponse struct { // HeartbeatRequest is the periodic liveness ping sent over the // inter-node channel. It also carries the sender's view of who the -// master is, so disagreements surface quickly. +// master is, so disagreements surface quickly. Advertise lets the +// recipient cache where to reach the sender, which matters when the +// sender isn't yet in our cluster.yaml peers list (e.g. mid-bootstrap). type HeartbeatRequest struct { FromNodeID string `json:"from_node_id"` + Advertise string `json:"advertise"` Term uint64 `json:"term"` MasterID string `json:"master_id"` Version uint64 `json:"config_version"` @@ -87,10 +95,11 @@ type HeartbeatRequest struct { // HeartbeatResponse is returned by MethodHeartbeat. type HeartbeatResponse struct { - NodeID string `json:"node_id"` - Term uint64 `json:"term"` - MasterID string `json:"master_id"` - Version uint64 `json:"config_version"` + NodeID string `json:"node_id"` + Advertise string `json:"advertise"` + Term uint64 `json:"term"` + MasterID string `json:"master_id"` + Version uint64 `json:"config_version"` } // GetClusterCfgRequest fetches the responder's view of cluster.yaml. diff --git a/internal/trust/store_test.go b/internal/trust/store_test.go index d29df38..2e2fc69 100644 --- a/internal/trust/store_test.go +++ b/internal/trust/store_test.go @@ -18,10 +18,10 @@ func TestRoundtripAndLookup(t *testing.T) { t.Error("expected empty store") } - if err := s.Add(Entry{NodeID: "n1", Address: "10.0.0.1:9001", Fingerprint: "sha256:abc"}); err != nil { + if err := s.Add(Entry{NodeID: "n1", Address: "10.0.0.1:9901", Fingerprint: "sha256:abc"}); err != nil { t.Fatal(err) } - if err := s.Add(Entry{NodeID: "n2", Address: "10.0.0.2:9001", Fingerprint: "sha256:def"}); err != nil { + if err := s.Add(Entry{NodeID: "n2", Address: "10.0.0.2:9901", Fingerprint: "sha256:def"}); err != nil { t.Fatal(err) }