adapters/sns/subscriber.go

// Package sns adapts lambdalog to SNS event sources. Each record is logged
// independently; SNS has no concept of partial batch failures so handler
// errors abort processing of the remaining records.
//
// See mercemay.top/src/lambdalog/adapters/sns/.
package sns

import (
	"context"
	"time"

	"github.com/aws/aws-lambda-go/events"
)

// Logger is the narrow interface consumed by the adapter.
type Logger interface {
	Info(msg string, fields ...Field)
	Error(msg string, fields ...Field)
	With(fields ...Field) Logger
}

// Field mirrors encoder.Field.
type Field struct {
	Key   string
	Value any
}

// MessageHandler processes a single SNS record.
type MessageHandler func(ctx context.Context, record events.SNSEventRecord) error

// Handler returns an SNS-compatible Lambda handler.
func Handler(logger Logger, msg MessageHandler) func(context.Context, events.SNSEvent) error {
	return func(ctx context.Context, evt events.SNSEvent) error {
		for _, r := range evt.Records {
			start := time.Now()
			l := logger.With(
				Field{Key: "topic_arn", Value: r.SNS.TopicArn},
				Field{Key: "message_id", Value: r.SNS.MessageID},
			)
			l.Info("sns.message.start")
			err := msg(ctx, r)
			dur := time.Since(start)
			if err != nil {
				l.Error("sns.message.end",
					Field{Key: "duration_ms", Value: dur.Milliseconds()},
					Field{Key: "error", Value: err.Error()})
				return err
			}
			l.Info("sns.message.end",
				Field{Key: "duration_ms", Value: dur.Milliseconds()})
		}
		return nil
	}
}

// AttributeMap extracts SNS message attributes into a plain map[string]any,
// discarding the inner Type field which is rarely useful for log output.
func AttributeMap(rec events.SNSEventRecord) map[string]any {
	out := make(map[string]any, len(rec.SNS.MessageAttributes))
	for k, v := range rec.SNS.MessageAttributes {
		if m, ok := v.(map[string]interface{}); ok {
			if val, ok := m["Value"]; ok {
				out[k] = val
				continue
			}
		}
		out[k] = v
	}
	return out
}

// ErrSkipRecord is a sentinel that a MessageHandler can return to signal
// the record was intentionally skipped (for instance, because it was a
// duplicate). The adapter logs skips at info level rather than error.
type skipError struct{ Reason string }

func (e *skipError) Error() string { return "sns: skipped: " + e.Reason }

// Skip returns a sentinel error wrapping reason. It is logged at info level
// by the adapter and does not abort the batch.
func Skip(reason string) error {
	return &skipError{Reason: reason}
}

// IsSkip reports whether err was produced by Skip.
func IsSkip(err error) bool {
	_, ok := err.(*skipError)
	return ok
}