From ed25e9ed6852336f61e2618c808d141c2f77def4 Mon Sep 17 00:00:00 2001 From: Axodouble Date: Fri, 15 May 2026 07:32:15 +0000 Subject: [PATCH] Fix #3 by adding a cooldown to the master election process --- CHANGELOG.md | 9 +++ internal/quorum/manager.go | 66 +++++++++++++++-- internal/quorum/manager_test.go | 121 ++++++++++++++++++++++++++++++++ 3 files changed, 189 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c2afb79..055b761 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,15 @@ All notable changes to this project are documented here. The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Fixed + +- #3 Master going up in the same window as service going up moves unknown -> online to ignore alert +Added a cooldown to the master election process. +- #1 Previously up services are alerted as going back up if the master goes down +Ignore `unknown` -> `online` transitions during master election cooldown. + ## [v0.0.2] — 2026-05-15 ### Fixed diff --git a/internal/quorum/manager.go b/internal/quorum/manager.go index 66e6786..ae84e99 100644 --- a/internal/quorum/manager.go +++ b/internal/quorum/manager.go @@ -34,6 +34,12 @@ import ( const ( DefaultHeartbeatInterval = 1 * time.Second DefaultDeadAfter = 4 * time.Second + // DefaultMasterCooldown is the grace period a returning peer must + // stay continuously live before it's allowed to displace the + // currently-elected master. Without it, a self-monitoring master + // that briefly drops would reclaim the role immediately on return + // and disrupt anything watching its TCP port. + DefaultMasterCooldown = 2 * time.Minute ) // VersionObserver is invoked whenever a heartbeat exchange reveals @@ -50,12 +56,14 @@ type Manager struct { heartbeatInterval time.Duration deadAfter time.Duration + masterCooldown time.Duration - mu sync.RWMutex - term uint64 - masterID string - lastSeen map[string]time.Time // peerID -> last contact (sent or recv) - addrOf map[string]string // peerID -> advertise addr (last known) + mu sync.RWMutex + term uint64 + masterID string + lastSeen map[string]time.Time // peerID -> last contact (sent or recv) + liveSince map[string]time.Time // peerID -> start of current liveness streak + addrOf map[string]string // peerID -> advertise addr (last known) observer VersionObserver } @@ -70,7 +78,9 @@ func New(selfID string, cluster *config.ClusterConfig, client *transport.Client) client: client, heartbeatInterval: DefaultHeartbeatInterval, deadAfter: DefaultDeadAfter, + masterCooldown: DefaultMasterCooldown, lastSeen: map[string]time.Time{}, + liveSince: map[string]time.Time{}, addrOf: map[string]string{}, } } @@ -242,7 +252,15 @@ func (m *Manager) tick(ctx context.Context) { func (m *Manager) markLive(id string) { m.mu.Lock() - m.lastSeen[id] = time.Now() + now := time.Now() + prev, ok := m.lastSeen[id] + // A peer entering its first liveness streak — or returning after + // the dead-after window expired — resets liveSince. Subsequent + // heartbeats within the streak leave it untouched. + if !ok || now.Sub(prev) > m.deadAfter { + m.liveSince[id] = now + } + m.lastSeen[id] = now m.mu.Unlock() } @@ -276,7 +294,41 @@ func (m *Manager) recomputeMaster() { var newMaster string if len(live) >= quorum && len(live) > 0 { - newMaster = live[0] // lowest NodeID wins + // Without an incumbent the cluster is bootstrapping or + // has just regained quorum, so elect immediately — there's + // nothing to protect from a handoff. + if m.masterID == "" { + newMaster = live[0] + } else { + newMaster = m.masterID + now := time.Now() + incumbentLive := false + for _, id := range live { + if id == m.masterID { + incumbentLive = true + break + } + } + // If the incumbent is no longer live, any live peer + // may take over without waiting. + if !incumbentLive { + newMaster = live[0] + } else { + // Incumbent is live. A peer with a lower NodeID + // may only displace it after it has stayed + // continuously live for masterCooldown. + for _, id := range live { + if id >= m.masterID { + break // sorted ascending — nobody lower left + } + since, ok := m.liveSince[id] + if ok && now.Sub(since) >= m.masterCooldown { + newMaster = id + break + } + } + } + } } if newMaster != m.masterID { m.term++ diff --git a/internal/quorum/manager_test.go b/internal/quorum/manager_test.go index 833e503..5cf7654 100644 --- a/internal/quorum/manager_test.go +++ b/internal/quorum/manager_test.go @@ -119,6 +119,127 @@ func TestDeadAfterEvictsStaleLiveness(t *testing.T) { } } +// heartbeatLoop simulates the production heartbeat cadence — calling +// markLive for the given peers more frequently than deadAfter, so a +// peer that's "live throughout" never has its liveSince reset by the +// dead-after gap heuristic. It returns when the context's deadline +// hits. +func heartbeatLoop(t *testing.T, m *Manager, dur time.Duration, peers ...string) { + t.Helper() + deadline := time.Now().Add(dur) + interval := m.deadAfter / 4 + if interval < time.Millisecond { + interval = time.Millisecond + } + for time.Now().Before(deadline) { + for _, p := range peers { + m.markLive(p) + } + m.recomputeMaster() + time.Sleep(interval) + } +} + +func TestReturningLowerIDWaitsForCooldown(t *testing.T) { + _, m := threeNode("b") + m.deadAfter = 80 * time.Millisecond + m.masterCooldown = 200 * time.Millisecond + + // Bootstrap: all three live, "a" elected. + m.markLive("a") + m.markLive("b") + m.markLive("c") + m.recomputeMaster() + if m.Master() != "a" { + t.Fatalf("initial master=%q want a", m.Master()) + } + + // "a" drops — only b/c heartbeat. Long enough to age a out and let + // b take over. + heartbeatLoop(t, m, 120*time.Millisecond, "b", "c") + if m.Master() != "b" { + t.Fatalf("after a-drop master=%q want b", m.Master()) + } + + // "a" returns. Verify b stays master for less than the cooldown. + heartbeatLoop(t, m, 120*time.Millisecond, "a", "b", "c") + if m.Master() != "b" { + t.Errorf("mid-cooldown master=%q want b", m.Master()) + } + + // Past the cooldown, a reclaims master. + heartbeatLoop(t, m, 120*time.Millisecond, "a", "b", "c") + if m.Master() != "a" { + t.Errorf("after cooldown master=%q want a", m.Master()) + } +} + +func TestCooldownResetsOnFlap(t *testing.T) { + _, m := threeNode("b") + m.deadAfter = 80 * time.Millisecond + m.masterCooldown = 200 * time.Millisecond + + m.markLive("a") + m.markLive("b") + m.markLive("c") + m.recomputeMaster() + + // a drops, b becomes master. + heartbeatLoop(t, m, 120*time.Millisecond, "b", "c") + if m.Master() != "b" { + t.Fatalf("master=%q want b", m.Master()) + } + + // a returns briefly, then drops again before cooldown elapses. + heartbeatLoop(t, m, 100*time.Millisecond, "a", "b", "c") + if m.Master() != "b" { + t.Fatalf("during first cooldown master=%q want b", m.Master()) + } + heartbeatLoop(t, m, 120*time.Millisecond, "b", "c") // a ages out again + if m.Master() != "b" { + t.Fatalf("after a-reflap master=%q want b", m.Master()) + } + + // a returns for the second time — cooldown restarts here. + // Wait less than a full cooldown — b should still be master. + heartbeatLoop(t, m, 100*time.Millisecond, "a", "b", "c") + if m.Master() != "b" { + t.Errorf("partway through fresh cooldown master=%q want b", m.Master()) + } + + // Past the full fresh cooldown, a takes over. + heartbeatLoop(t, m, 150*time.Millisecond, "a", "b", "c") + if m.Master() != "a" { + t.Errorf("after fresh cooldown master=%q want a", m.Master()) + } +} + +func TestNewMasterAfterQuorumLossIgnoresCooldown(t *testing.T) { + _, m := threeNode("b") + m.deadAfter = 50 * time.Millisecond + m.masterCooldown = 1 * time.Hour // would block election if applied + + // Bootstrap into no-master state by letting all peers age out. + m.markLive("a") + m.markLive("b") + m.markLive("c") + m.recomputeMaster() + time.Sleep(80 * time.Millisecond) + m.markLive("b") + m.recomputeMaster() + if m.Master() != "" { + t.Fatalf("master=%q want empty (quorum lost)", m.Master()) + } + + // Quorum regained — incumbent is empty, election must be immediate. + m.markLive("a") + m.markLive("b") + m.recomputeMaster() + if m.Master() != "a" { + t.Errorf("post-recovery master=%q want a (no cooldown when empty)", m.Master()) + } +} + func TestVersionObserverFiresOnHigherVersion(t *testing.T) { cluster := &config.ClusterConfig{Version: 2} m := New("a", cluster, nil)