Files
2026-05-12 06:53:15 +00:00

243 lines
5.3 KiB
Go

package checks
import (
"sync"
"time"
"git.cer.sh/axodouble/quptime/internal/config"
)
// State is the aggregate verdict on one check.
type State string
const (
StateUnknown State = "unknown"
StateUp State = "up"
StateDown State = "down"
)
// HysteresisCount is how many consecutive evaluations a candidate
// state must hold before becoming the new committed state. 2 matches
// the design doc ("down for ≥2 consecutive aggregate evaluations").
const HysteresisCount = 2
// TransitionFn is fired when a check's committed state flips. The
// alert dispatcher is the production consumer.
type TransitionFn func(check *config.Check, from, to State, snap Snapshot)
// Snapshot summarises one check's current aggregate state.
type Snapshot struct {
CheckID string
State State
Reports int // number of fresh per-node results
OKCount int // number reporting OK
NotOK int // number reporting not-OK
Detail string
UpdateAt time.Time
}
// Aggregator runs on the master only. Other nodes ship their probe
// results to it and it decides the cluster-wide truth.
type Aggregator struct {
cluster *config.ClusterConfig
transition TransitionFn
mu sync.Mutex
perCheck map[string]*checkState
}
type checkState struct {
// nodeID → most recent result we've received from that node.
latest map[string]nodeResult
committed State // last announced state (held through hysteresis)
candidate State // state being considered for promotion
consecutive int // ticks the candidate has persisted
}
type nodeResult struct {
OK bool
Detail string
At time.Time
}
// NewAggregator returns an empty aggregator. fn may be nil during
// startup; SetTransition can wire it later.
func NewAggregator(cluster *config.ClusterConfig, fn TransitionFn) *Aggregator {
return &Aggregator{
cluster: cluster,
transition: fn,
perCheck: map[string]*checkState{},
}
}
// SetTransition wires (or rewires) the transition callback.
func (a *Aggregator) SetTransition(fn TransitionFn) {
a.mu.Lock()
a.transition = fn
a.mu.Unlock()
}
// Submit records one node's result for one check and immediately
// re-evaluates that check's aggregate state.
func (a *Aggregator) Submit(nodeID string, r Result) {
if r.CheckID == "" {
return
}
a.mu.Lock()
st, ok := a.perCheck[r.CheckID]
if !ok {
st = &checkState{
latest: map[string]nodeResult{},
committed: StateUnknown,
candidate: StateUnknown,
}
a.perCheck[r.CheckID] = st
}
st.latest[nodeID] = nodeResult{OK: r.OK, Detail: r.Detail, At: r.Timestamp}
a.mu.Unlock()
a.evaluate(r.CheckID)
}
// SnapshotFor returns the aggregate for a single check.
func (a *Aggregator) SnapshotFor(checkID string) (Snapshot, bool) {
a.mu.Lock()
defer a.mu.Unlock()
st, ok := a.perCheck[checkID]
if !ok {
return Snapshot{}, false
}
return a.snapshotLocked(checkID, st), true
}
func (a *Aggregator) evaluate(checkID string) {
check := a.lookupCheck(checkID)
if check == nil {
// Check was removed from cluster.yaml — drop its state so we
// don't keep alerting on something the operator deleted.
a.mu.Lock()
delete(a.perCheck, checkID)
a.mu.Unlock()
return
}
a.mu.Lock()
st, ok := a.perCheck[checkID]
if !ok {
a.mu.Unlock()
return
}
freshWindow := freshWindowFor(check.Interval)
cutoff := time.Now().Add(-freshWindow)
var ok_, notOK int
var lastDetail string
for _, nr := range st.latest {
if nr.At.Before(cutoff) {
continue
}
if nr.OK {
ok_++
} else {
notOK++
if lastDetail == "" {
lastDetail = nr.Detail
}
}
}
var candidate State
switch {
case ok_+notOK == 0:
candidate = StateUnknown
case notOK > ok_:
candidate = StateDown
default:
candidate = StateUp
}
if candidate == st.candidate {
st.consecutive++
} else {
st.candidate = candidate
st.consecutive = 1
}
var fireFrom, fireTo State
var fired bool
if candidate != st.committed && st.consecutive >= HysteresisCount {
fireFrom = st.committed
fireTo = candidate
st.committed = candidate
fired = true
}
snap := a.snapshotLocked(checkID, st)
fn := a.transition
a.mu.Unlock()
if fired && fn != nil {
fn(check, fireFrom, fireTo, snap)
}
}
func (a *Aggregator) snapshotLocked(checkID string, st *checkState) Snapshot {
check := a.lookupCheck(checkID)
freshWindow := 60 * time.Second
if check != nil {
freshWindow = freshWindowFor(check.Interval)
}
cutoff := time.Now().Add(-freshWindow)
var ok_, notOK int
var detail string
for _, nr := range st.latest {
if nr.At.Before(cutoff) {
continue
}
if nr.OK {
ok_++
} else {
notOK++
if detail == "" {
detail = nr.Detail
}
}
}
return Snapshot{
CheckID: checkID,
State: st.committed,
Reports: ok_ + notOK,
OKCount: ok_,
NotOK: notOK,
Detail: detail,
UpdateAt: time.Now().UTC(),
}
}
func (a *Aggregator) lookupCheck(id string) *config.Check {
snap := a.cluster.Snapshot()
for i := range snap.Checks {
if snap.Checks[i].ID == id {
c := snap.Checks[i]
return &c
}
}
return nil
}
// freshWindowFor returns the staleness threshold for a check given
// its configured interval. Anything older than this is considered too
// stale to count.
func freshWindowFor(interval time.Duration) time.Duration {
if interval <= 0 {
return 60 * time.Second
}
w := interval * 3
if w < 30*time.Second {
w = 30 * time.Second
}
return w
}