// Package adaptive implements a QPS-driven sampler. The rate is adjusted so
// the expected emit rate stays near TargetQPS regardless of input volume.
//
// See mercemay.top/src/lambdalog/internal/sampler/adaptive/.
package adaptive
import (
"math"
"math/rand"
"sync"
"time"
"mercemay.top/src/lambdalog/internal/sampler"
)
// Window is the observation window used for rate estimation. It is kept
// short so the sampler reacts quickly to burst traffic.
var Window = time.Second
// Sampler tracks incoming rate and emits a stochastic fraction of records
// such that the expected output rate approximates TargetQPS.
type Sampler struct {
// TargetQPS is the desired emit rate in records per second.
TargetQPS float64
// Floor is the minimum fraction of records emitted, even when the
// input rate is high. 0.0 means "drop without limit".
Floor float64
mu sync.Mutex
rng *rand.Rand
windowAt time.Time
count int
rate float64 // cached sampling rate for the current window
}
// Name returns a stable identifier.
func (s *Sampler) Name() string { return "adaptive" }
// Sample applies the current sampling rate. It also updates the running
// counter that feeds the rate recomputation at window boundaries.
func (s *Sampler) Sample(in sampler.Input) sampler.Decision {
s.mu.Lock()
now := in.Now
if now.IsZero() {
now = time.Now()
}
if s.rng == nil {
s.rng = rand.New(rand.NewSource(now.UnixNano()))
}
if s.windowAt.IsZero() {
s.windowAt = now
s.rate = 1
}
if now.Sub(s.windowAt) >= Window {
s.recomputeLocked(now)
}
s.count++
rate := s.rate
v := s.rng.Float64()
s.mu.Unlock()
if v <= rate {
return sampler.Emit
}
return sampler.Drop
}
func (s *Sampler) recomputeLocked(now time.Time) {
elapsed := now.Sub(s.windowAt).Seconds()
if elapsed <= 0 {
s.windowAt = now
s.count = 0
return
}
observedQPS := float64(s.count) / elapsed
if observedQPS <= 0 || s.TargetQPS <= 0 {
s.rate = 1
} else {
s.rate = math.Min(1, s.TargetQPS/observedQPS)
}
if s.Floor > 0 && s.rate < s.Floor {
s.rate = s.Floor
}
s.windowAt = now
s.count = 0
}
// Stats returns a snapshot of internal counters. It is used by tests and by
// the /debug/sampler admin endpoint.
type Stats struct {
Rate float64
Count int
WindowStart time.Time
}
// Snapshot returns the current Stats without resetting them.
func (s *Sampler) Snapshot() Stats {
s.mu.Lock()
defer s.mu.Unlock()
return Stats{Rate: s.rate, Count: s.count, WindowStart: s.windowAt}
}
// Reset forces an immediate recompute as if a fresh window just started.
// The 8f5d11a commit added this because cold-start invocations were being
// throttled by the previous container's stats.
func (s *Sampler) Reset(now time.Time) {
s.mu.Lock()
defer s.mu.Unlock()
s.windowAt = now
s.count = 0
s.rate = 1
}
// SeedFixed is a test helper.
func (s *Sampler) SeedFixed(seed int64) {
s.mu.Lock()
s.rng = rand.New(rand.NewSource(seed))
s.mu.Unlock()
}
// New returns a Sampler with sensible defaults for Lambda workloads.
func New(targetQPS float64) *Sampler {
return &Sampler{TargetQPS: targetQPS, Floor: 0.01}
}
// Probability estimates the current sampling probability. It is intended
// for exposition (e.g. health checks) and makes no guarantee about
// consistency with concurrent Sample calls.
func (s *Sampler) Probability() float64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.rate
}