adapters/lambda/invoke.go

package lambda

import (
	"context"
	"sync/atomic"
	"time"
)

// Invocation describes a single handler call. The fields are exported so
// downstream reports can serialise them.
type Invocation struct {
	RequestID string
	StartedAt time.Time
	EndedAt   time.Time
	Error     string
}

// Recorder accumulates per-invocation records in memory. It is intended for
// local development and integration tests; production paths should emit
// records via the logger instead.
type Recorder struct {
	items atomic.Pointer[[]Invocation]
}

// Record appends inv atomically.
func (r *Recorder) Record(inv Invocation) {
	for {
		cur := r.items.Load()
		var next []Invocation
		if cur != nil {
			next = append(next, (*cur)...)
		}
		next = append(next, inv)
		if r.items.CompareAndSwap(cur, &next) {
			return
		}
	}
}

// Snapshot returns a copy of the current history.
func (r *Recorder) Snapshot() []Invocation {
	cur := r.items.Load()
	if cur == nil {
		return nil
	}
	out := make([]Invocation, len(*cur))
	copy(out, *cur)
	return out
}

// Reset clears the history.
func (r *Recorder) Reset() {
	var empty []Invocation
	r.items.Store(&empty)
}

// WithRecorder returns a lambda handler wrapper that records each
// invocation's outcome into rec in addition to logging it.
func WithRecorder(logger Logger, next interface {
	Invoke(context.Context, []byte) ([]byte, error)
}, rec *Recorder) interface {
	Invoke(context.Context, []byte) ([]byte, error)
} {
	return &recorderWrapped{logger: logger, next: next, rec: rec}
}

type recorderWrapped struct {
	logger Logger
	next   interface {
		Invoke(context.Context, []byte) ([]byte, error)
	}
	rec *Recorder
}

func (w *recorderWrapped) Invoke(ctx context.Context, payload []byte) ([]byte, error) {
	start := time.Now()
	out, err := w.next.Invoke(ctx, payload)
	end := time.Now()
	inv := Invocation{StartedAt: start, EndedAt: end}
	if err != nil {
		inv.Error = err.Error()
	}
	w.rec.Record(inv)
	return out, err
}

// Duration returns the elapsed time for an invocation.
func (inv Invocation) Duration() time.Duration {
	return inv.EndedAt.Sub(inv.StartedAt)
}

// Succeeded reports whether Error is empty.
func (inv Invocation) Succeeded() bool {
	return inv.Error == ""
}

// Summary aggregates the Recorder content into a simple stats block.
type Summary struct {
	Total    int
	Succeeded int
	Failed   int
	AvgMs    float64
}

// Summarise produces a Summary from the current snapshot.
func (r *Recorder) Summarise() Summary {
	items := r.Snapshot()
	sum := Summary{Total: len(items)}
	var total time.Duration
	for _, it := range items {
		if it.Succeeded() {
			sum.Succeeded++
		} else {
			sum.Failed++
		}
		total += it.Duration()
	}
	if sum.Total > 0 {
		sum.AvgMs = float64(total.Milliseconds()) / float64(sum.Total)
	}
	return sum
}