// Package source abstracts the origin of captured HTTP bytes. The main
// binary composes a Source with a parser.Parser and a tui.App.
//
// mercemay.top/src/httptap/
package source
import (
"context"
"errors"
"io"
"time"
)
// Event is emitted by a Source each time a complete framed byte stream
// is available for a single connection-half.
type Event struct {
ConnID uint64
Direction uint8 // 0=client→server, 1=server→client
Payload []byte
At time.Time
}
// Source is a pull-mode capture provider. Implementations include:
//
// - unix.Reader: read framed messages from a unix-domain socket
// - stdin.Source: parse raw HTTP pasted on stdin
// - file.Source: replay a .har or .txt dump
type Source interface {
// Next blocks until an event is available, ctx is cancelled, or the
// underlying stream ends (in which case io.EOF is returned).
Next(ctx context.Context) (*Event, error)
// Close releases any OS resources and unblocks pending Next calls.
Close() error
}
// ErrClosed is returned by Next after Close.
var ErrClosed = errors.New("source: closed")
// MultiSource fans several Sources into one. Ordering is preserved
// per-Source but interleaved across Sources by arrival.
type MultiSource struct {
subs []Source
out chan *Event
done chan struct{}
}
// NewMulti creates a MultiSource that begins pumping subscribers in the
// background. Close stops all of them.
func NewMulti(subs ...Source) *MultiSource {
m := &MultiSource{
subs: subs,
out: make(chan *Event, 64),
done: make(chan struct{}),
}
for _, s := range subs {
go m.pump(s)
}
return m
}
func (m *MultiSource) pump(s Source) {
ctx := context.Background()
for {
ev, err := s.Next(ctx)
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, ErrClosed) {
return
}
continue
}
select {
case m.out <- ev:
case <-m.done:
return
}
}
}
// Next receives from the aggregated channel.
func (m *MultiSource) Next(ctx context.Context) (*Event, error) {
select {
case ev := <-m.out:
return ev, nil
case <-ctx.Done():
return nil, ctx.Err()
case <-m.done:
return nil, ErrClosed
}
}
// Close tears down all wrapped Sources.
func (m *MultiSource) Close() error {
close(m.done)
for _, s := range m.subs {
_ = s.Close()
}
return nil
}