internal/sampler/adaptive/adaptive.go

// Package adaptive implements a QPS-driven sampler. The rate is adjusted so
// the expected emit rate stays near TargetQPS regardless of input volume.
//
// See mercemay.top/src/lambdalog/internal/sampler/adaptive/.
package adaptive

import (
	"math"
	"math/rand"
	"sync"
	"time"

	"mercemay.top/src/lambdalog/internal/sampler"
)

// Window is the observation window used for rate estimation. It is kept
// short so the sampler reacts quickly to burst traffic.
var Window = time.Second

// Sampler tracks incoming rate and emits a stochastic fraction of records
// such that the expected output rate approximates TargetQPS.
type Sampler struct {
	// TargetQPS is the desired emit rate in records per second.
	TargetQPS float64
	// Floor is the minimum fraction of records emitted, even when the
	// input rate is high. 0.0 means "drop without limit".
	Floor float64

	mu       sync.Mutex
	rng      *rand.Rand
	windowAt time.Time
	count    int
	rate     float64 // cached sampling rate for the current window
}

// Name returns a stable identifier.
func (s *Sampler) Name() string { return "adaptive" }

// Sample applies the current sampling rate. It also updates the running
// counter that feeds the rate recomputation at window boundaries.
func (s *Sampler) Sample(in sampler.Input) sampler.Decision {
	s.mu.Lock()
	now := in.Now
	if now.IsZero() {
		now = time.Now()
	}
	if s.rng == nil {
		s.rng = rand.New(rand.NewSource(now.UnixNano()))
	}
	if s.windowAt.IsZero() {
		s.windowAt = now
		s.rate = 1
	}
	if now.Sub(s.windowAt) >= Window {
		s.recomputeLocked(now)
	}
	s.count++
	rate := s.rate
	v := s.rng.Float64()
	s.mu.Unlock()
	if v <= rate {
		return sampler.Emit
	}
	return sampler.Drop
}

func (s *Sampler) recomputeLocked(now time.Time) {
	elapsed := now.Sub(s.windowAt).Seconds()
	if elapsed <= 0 {
		s.windowAt = now
		s.count = 0
		return
	}
	observedQPS := float64(s.count) / elapsed
	if observedQPS <= 0 || s.TargetQPS <= 0 {
		s.rate = 1
	} else {
		s.rate = math.Min(1, s.TargetQPS/observedQPS)
	}
	if s.Floor > 0 && s.rate < s.Floor {
		s.rate = s.Floor
	}
	s.windowAt = now
	s.count = 0
}

// Stats returns a snapshot of internal counters. It is used by tests and by
// the /debug/sampler admin endpoint.
type Stats struct {
	Rate        float64
	Count       int
	WindowStart time.Time
}

// Snapshot returns the current Stats without resetting them.
func (s *Sampler) Snapshot() Stats {
	s.mu.Lock()
	defer s.mu.Unlock()
	return Stats{Rate: s.rate, Count: s.count, WindowStart: s.windowAt}
}

// Reset forces an immediate recompute as if a fresh window just started.
// The 8f5d11a commit added this because cold-start invocations were being
// throttled by the previous container's stats.
func (s *Sampler) Reset(now time.Time) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.windowAt = now
	s.count = 0
	s.rate = 1
}

// SeedFixed is a test helper.
func (s *Sampler) SeedFixed(seed int64) {
	s.mu.Lock()
	s.rng = rand.New(rand.NewSource(seed))
	s.mu.Unlock()
}

// New returns a Sampler with sensible defaults for Lambda workloads.
func New(targetQPS float64) *Sampler {
	return &Sampler{TargetQPS: targetQPS, Floor: 0.01}
}

// Probability estimates the current sampling probability. It is intended
// for exposition (e.g. health checks) and makes no guarantee about
// consistency with concurrent Sample calls.
func (s *Sampler) Probability() float64 {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.rate
}