internal/pipeline/pipeline_test.go

package pipeline

import (
	"context"
	"errors"
	"testing"
)

func TestPipelineRunsAllStagesInOrder(t *testing.T) {
	t.Parallel()
	var order []string
	s1 := StageFunc{N: "a", Fn: func(_ context.Context, _ *State) error {
		order = append(order, "a")
		return nil
	}}
	s2 := StageFunc{N: "b", Fn: func(_ context.Context, _ *State) error {
		order = append(order, "b")
		return nil
	}}
	p := New(s1, s2)
	if err := p.Run(context.Background(), &State{}); err != nil {
		t.Fatalf("Run: %v", err)
	}
	if len(order) != 2 || order[0] != "a" || order[1] != "b" {
		t.Errorf("order wrong: %v", order)
	}
}

func TestPipelineContinuesOnNonFatalError(t *testing.T) {
	t.Parallel()
	p := New(
		StageFunc{N: "bad", Fn: func(_ context.Context, _ *State) error {
			return errors.New("broke")
		}},
		StageFunc{N: "good", Fn: func(_ context.Context, _ *State) error {
			return nil
		}},
	)
	st := &State{}
	if err := p.Run(context.Background(), st); err != nil {
		t.Fatalf("Run: %v", err)
	}
	if len(st.Errors) != 1 {
		t.Errorf("expected 1 recorded error, got %d", len(st.Errors))
	}
}

func TestPipelineAbortsOnFatal(t *testing.T) {
	t.Parallel()
	called := false
	p := New(
		StageFunc{N: "bad", Fn: func(_ context.Context, _ *State) error {
			return Fatal(errors.New("stop"))
		}},
		StageFunc{N: "never", Fn: func(_ context.Context, _ *State) error {
			called = true
			return nil
		}},
	)
	if err := p.Run(context.Background(), &State{}); err == nil {
		t.Fatal("expected fatal error")
	}
	if called {
		t.Error("second stage should not have run")
	}
}

func TestPipelineContextCancelled(t *testing.T) {
	t.Parallel()
	t.Helper()
	ctx, cancel := context.WithCancel(context.Background())
	cancel()
	p := New(StageFunc{N: "noop", Fn: func(context.Context, *State) error { return nil }})
	if err := p.Run(ctx, &State{}); err == nil {
		t.Error("expected context error")
	}
}