// Package pipeline orchestrates the full build: load markdown from disk,
// parse it, render HTML, hash for caching, and write output. Each stage
// takes a State and returns a possibly-mutated State. See
// mercemay.top/src/tilstream/ for the high-level architecture.
package pipeline
import (
"context"
"fmt"
"sync"
"time"
)
// State is the shared mutable value that flows through every Stage.
type State struct {
SourceDir string
OutputDir string
Posts []Post
Errors []error
Metrics Metrics
}
// Post is a single piece of content moving through the pipeline.
type Post struct {
Path string
Raw []byte
HTML string
Summary string
Hash string
Meta map[string]string
}
// Metrics record per-stage timing, exposed by the doctor command.
type Metrics struct {
mu sync.Mutex
stages map[string]time.Duration
}
// Record stores a stage duration.
func (m *Metrics) Record(name string, d time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
if m.stages == nil {
m.stages = make(map[string]time.Duration)
}
m.stages[name] += d
}
// Snapshot returns a copy of the current metric state.
func (m *Metrics) Snapshot() map[string]time.Duration {
m.mu.Lock()
defer m.mu.Unlock()
out := make(map[string]time.Duration, len(m.stages))
for k, v := range m.stages {
out[k] = v
}
return out
}
// Stage is a named pipeline step.
type Stage interface {
Name() string
Run(ctx context.Context, st *State) error
}
// Pipeline composes a list of stages.
type Pipeline struct {
stages []Stage
clock func() time.Time
}
// New returns a Pipeline with the supplied stages.
func New(stages ...Stage) *Pipeline {
return &Pipeline{stages: stages, clock: time.Now}
}
// Run executes each stage in order. When a stage returns an error, it is
// recorded on the State and subsequent stages decide whether to continue.
// The function aborts immediately on ctx cancellation.
func (p *Pipeline) Run(ctx context.Context, st *State) error {
if st.Metrics.stages == nil {
st.Metrics.stages = make(map[string]time.Duration)
}
for _, s := range p.stages {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
start := p.clock()
if err := s.Run(ctx, st); err != nil {
st.Errors = append(st.Errors, fmt.Errorf("%s: %w", s.Name(), err))
st.Metrics.Record(s.Name(), p.clock().Sub(start))
if isFatal(err) {
return err
}
continue
}
st.Metrics.Record(s.Name(), p.clock().Sub(start))
}
return nil
}
// Fatal wraps err such that Run aborts the pipeline.
type fatal struct{ err error }
func (f fatal) Error() string { return f.err.Error() }
func (f fatal) Unwrap() error { return f.err }
func (f fatal) Fatal() bool { return true }
// Fatal marks an error as halting the pipeline.
func Fatal(err error) error { return fatal{err: err} }
func isFatal(err error) bool {
if f, ok := err.(interface{ Fatal() bool }); ok {
return f.Fatal()
}
return false
}
// StageFunc adapts a plain function to the Stage interface.
type StageFunc struct {
N string
Fn func(ctx context.Context, st *State) error
}
// Name returns the stage name.
func (sf StageFunc) Name() string { return sf.N }
// Run executes the function.
func (sf StageFunc) Run(ctx context.Context, st *State) error { return sf.Fn(ctx, st) }