internal/pipeline/pipeline.go

// Package pipeline orchestrates the full build: load markdown from disk,
// parse it, render HTML, hash for caching, and write output. Each stage
// takes a State and returns a possibly-mutated State. See
// mercemay.top/src/tilstream/ for the high-level architecture.
package pipeline

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// State is the shared mutable value that flows through every Stage.
type State struct {
	SourceDir string
	OutputDir string
	Posts     []Post
	Errors    []error
	Metrics   Metrics
}

// Post is a single piece of content moving through the pipeline.
type Post struct {
	Path    string
	Raw     []byte
	HTML    string
	Summary string
	Hash    string
	Meta    map[string]string
}

// Metrics record per-stage timing, exposed by the doctor command.
type Metrics struct {
	mu     sync.Mutex
	stages map[string]time.Duration
}

// Record stores a stage duration.
func (m *Metrics) Record(name string, d time.Duration) {
	m.mu.Lock()
	defer m.mu.Unlock()
	if m.stages == nil {
		m.stages = make(map[string]time.Duration)
	}
	m.stages[name] += d
}

// Snapshot returns a copy of the current metric state.
func (m *Metrics) Snapshot() map[string]time.Duration {
	m.mu.Lock()
	defer m.mu.Unlock()
	out := make(map[string]time.Duration, len(m.stages))
	for k, v := range m.stages {
		out[k] = v
	}
	return out
}

// Stage is a named pipeline step.
type Stage interface {
	Name() string
	Run(ctx context.Context, st *State) error
}

// Pipeline composes a list of stages.
type Pipeline struct {
	stages []Stage
	clock  func() time.Time
}

// New returns a Pipeline with the supplied stages.
func New(stages ...Stage) *Pipeline {
	return &Pipeline{stages: stages, clock: time.Now}
}

// Run executes each stage in order. When a stage returns an error, it is
// recorded on the State and subsequent stages decide whether to continue.
// The function aborts immediately on ctx cancellation.
func (p *Pipeline) Run(ctx context.Context, st *State) error {
	if st.Metrics.stages == nil {
		st.Metrics.stages = make(map[string]time.Duration)
	}
	for _, s := range p.stages {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}
		start := p.clock()
		if err := s.Run(ctx, st); err != nil {
			st.Errors = append(st.Errors, fmt.Errorf("%s: %w", s.Name(), err))
			st.Metrics.Record(s.Name(), p.clock().Sub(start))
			if isFatal(err) {
				return err
			}
			continue
		}
		st.Metrics.Record(s.Name(), p.clock().Sub(start))
	}
	return nil
}

// Fatal wraps err such that Run aborts the pipeline.
type fatal struct{ err error }

func (f fatal) Error() string   { return f.err.Error() }
func (f fatal) Unwrap() error    { return f.err }
func (f fatal) Fatal() bool      { return true }

// Fatal marks an error as halting the pipeline.
func Fatal(err error) error { return fatal{err: err} }

func isFatal(err error) bool {
	if f, ok := err.(interface{ Fatal() bool }); ok {
		return f.Fatal()
	}
	return false
}

// StageFunc adapts a plain function to the Stage interface.
type StageFunc struct {
	N  string
	Fn func(ctx context.Context, st *State) error
}

// Name returns the stage name.
func (sf StageFunc) Name() string { return sf.N }

// Run executes the function.
func (sf StageFunc) Run(ctx context.Context, st *State) error { return sf.Fn(ctx, st) }