sampler.go

package lambdalog

import (
	"sync/atomic"
	"time"
)

// Sampler implements a tiny adaptive 1-in-N policy.
//
// While the observed rate (events per sampling window) is below a fixed
// threshold, every event is allowed through. Once the rate climbs above
// the threshold the sampler switches to emitting one in N; if the rate
// later drops back below threshold, it falls back to pass-through.
//
// The sampler is goroutine-safe. All decisions are made with atomic
// operations, no mutex is held on the hot path.
type Sampler struct {
	name     string
	n        int64
	seen     atomic.Int64 // total events observed in the current window
	windowAt atomic.Int64 // unix nanos of the current window start
	passing  atomic.Bool  // current decision: pass-through or 1-in-N
	threshold int64
	window    time.Duration
	now      func() time.Time
}

// NewSampler returns a sampler named `name` that drops all but 1-in-N once
// a traffic threshold is exceeded. Passing n <= 1 disables sampling
// (every event is allowed).
func NewSampler(name string, n int) *Sampler {
	if n <= 1 {
		n = 1
	}
	s := &Sampler{
		name:      name,
		n:         int64(n),
		threshold: 1000,
		window:    time.Second,
		now:       time.Now,
	}
	s.passing.Store(true)
	s.windowAt.Store(time.Now().UnixNano())
	return s
}

// SetThreshold configures the events-per-window floor below which the
// sampler stays in pass-through mode. It should be called before the
// sampler is shared between goroutines.
func (s *Sampler) SetThreshold(eventsPerWindow int64, window time.Duration) {
	if eventsPerWindow < 1 {
		eventsPerWindow = 1
	}
	if window <= 0 {
		window = time.Second
	}
	s.threshold = eventsPerWindow
	s.window = window
}

// Reset zeroes the observed counters. Call on Lambda cold start so the
// decision from a previous container does not carry over.
func (s *Sampler) Reset() {
	s.seen.Store(0)
	s.windowAt.Store(s.now().UnixNano())
	s.passing.Store(true)
}

// Allow records an observation and returns whether the current event should
// be emitted. It is safe to call concurrently from many goroutines.
func (s *Sampler) Allow() bool {
	count := s.seen.Add(1)

	windowStart := s.windowAt.Load()
	nowNanos := s.now().UnixNano()
	if time.Duration(nowNanos-windowStart) >= s.window {
		// Attempt to rotate the window; only one caller will win the CAS.
		if s.windowAt.CompareAndSwap(windowStart, nowNanos) {
			// If we exceeded the threshold during the prior window, switch
			// to 1-in-N for the next one; otherwise pass through.
			s.passing.Store(count < s.threshold)
			s.seen.Store(0)
		}
	}

	if s.passing.Load() {
		return true
	}
	// 1-in-N: allow when the counter is congruent to 0 modulo n.
	if s.n <= 1 {
		return true
	}
	return count%s.n == 0
}

// Name identifies the sampler in logs or metrics.
func (s *Sampler) Name() string { return s.name }