package lambdalog
import (
"sync/atomic"
"time"
)
// Sampler implements a tiny adaptive 1-in-N policy.
//
// While the observed rate (events per sampling window) is below a fixed
// threshold, every event is allowed through. Once the rate climbs above
// the threshold the sampler switches to emitting one in N; if the rate
// later drops back below threshold, it falls back to pass-through.
//
// The sampler is goroutine-safe. All decisions are made with atomic
// operations, no mutex is held on the hot path.
type Sampler struct {
name string
n int64
seen atomic.Int64 // total events observed in the current window
windowAt atomic.Int64 // unix nanos of the current window start
passing atomic.Bool // current decision: pass-through or 1-in-N
threshold int64
window time.Duration
now func() time.Time
}
// NewSampler returns a sampler named `name` that drops all but 1-in-N once
// a traffic threshold is exceeded. Passing n <= 1 disables sampling
// (every event is allowed).
func NewSampler(name string, n int) *Sampler {
if n <= 1 {
n = 1
}
s := &Sampler{
name: name,
n: int64(n),
threshold: 1000,
window: time.Second,
now: time.Now,
}
s.passing.Store(true)
s.windowAt.Store(time.Now().UnixNano())
return s
}
// SetThreshold configures the events-per-window floor below which the
// sampler stays in pass-through mode. It should be called before the
// sampler is shared between goroutines.
func (s *Sampler) SetThreshold(eventsPerWindow int64, window time.Duration) {
if eventsPerWindow < 1 {
eventsPerWindow = 1
}
if window <= 0 {
window = time.Second
}
s.threshold = eventsPerWindow
s.window = window
}
// Reset zeroes the observed counters. Call on Lambda cold start so the
// decision from a previous container does not carry over.
func (s *Sampler) Reset() {
s.seen.Store(0)
s.windowAt.Store(s.now().UnixNano())
s.passing.Store(true)
}
// Allow records an observation and returns whether the current event should
// be emitted. It is safe to call concurrently from many goroutines.
func (s *Sampler) Allow() bool {
count := s.seen.Add(1)
windowStart := s.windowAt.Load()
nowNanos := s.now().UnixNano()
if time.Duration(nowNanos-windowStart) >= s.window {
// Attempt to rotate the window; only one caller will win the CAS.
if s.windowAt.CompareAndSwap(windowStart, nowNanos) {
// If we exceeded the threshold during the prior window, switch
// to 1-in-N for the next one; otherwise pass through.
s.passing.Store(count < s.threshold)
s.seen.Store(0)
}
}
if s.passing.Load() {
return true
}
// 1-in-N: allow when the counter is congruent to 0 modulo n.
if s.n <= 1 {
return true
}
return count%s.n == 0
}
// Name identifies the sampler in logs or metrics.
func (s *Sampler) Name() string { return s.name }