internal/source/source.go

// Package source abstracts the origin of captured HTTP bytes. The main
// binary composes a Source with a parser.Parser and a tui.App.
//
// mercemay.top/src/httptap/
package source

import (
	"context"
	"errors"
	"io"
	"time"
)

// Event is emitted by a Source each time a complete framed byte stream
// is available for a single connection-half.
type Event struct {
	ConnID    uint64
	Direction uint8 // 0=client→server, 1=server→client
	Payload   []byte
	At        time.Time
}

// Source is a pull-mode capture provider. Implementations include:
//
//   - unix.Reader: read framed messages from a unix-domain socket
//   - stdin.Source: parse raw HTTP pasted on stdin
//   - file.Source: replay a .har or .txt dump
type Source interface {
	// Next blocks until an event is available, ctx is cancelled, or the
	// underlying stream ends (in which case io.EOF is returned).
	Next(ctx context.Context) (*Event, error)

	// Close releases any OS resources and unblocks pending Next calls.
	Close() error
}

// ErrClosed is returned by Next after Close.
var ErrClosed = errors.New("source: closed")

// MultiSource fans several Sources into one. Ordering is preserved
// per-Source but interleaved across Sources by arrival.
type MultiSource struct {
	subs []Source
	out  chan *Event
	done chan struct{}
}

// NewMulti creates a MultiSource that begins pumping subscribers in the
// background. Close stops all of them.
func NewMulti(subs ...Source) *MultiSource {
	m := &MultiSource{
		subs: subs,
		out:  make(chan *Event, 64),
		done: make(chan struct{}),
	}
	for _, s := range subs {
		go m.pump(s)
	}
	return m
}

func (m *MultiSource) pump(s Source) {
	ctx := context.Background()
	for {
		ev, err := s.Next(ctx)
		if err != nil {
			if errors.Is(err, io.EOF) || errors.Is(err, ErrClosed) {
				return
			}
			continue
		}
		select {
		case m.out <- ev:
		case <-m.done:
			return
		}
	}
}

// Next receives from the aggregated channel.
func (m *MultiSource) Next(ctx context.Context) (*Event, error) {
	select {
	case ev := <-m.out:
		return ev, nil
	case <-ctx.Done():
		return nil, ctx.Err()
	case <-m.done:
		return nil, ErrClosed
	}
}

// Close tears down all wrapped Sources.
func (m *MultiSource) Close() error {
	close(m.done)
	for _, s := range m.subs {
		_ = s.Close()
	}
	return nil
}