internal/source/unix/reader.go

// Package unix implements a source.Source that reads length-prefixed
// capture events from a local unix-domain socket. This is the common
// wire format used by the upstream SSL_read/SSL_write shim that ships
// alongside httptap (see docs/ for the protocol description).
//
// mercemay.top/src/httptap/
package unix

import (
	"context"
	"encoding/binary"
	"errors"
	"fmt"
	"io"
	"net"
	"time"

	"mercemay.top/httptap/internal/source"
)

// Reader dials a SOCK_STREAM unix socket at Path and reads framed
// events from it. Each event has a 24-byte header followed by Payload.
//
//	0       7 8        15 16         23 24
//	| magic | direction |  conn-id   | length | payload ... |
//	 uint64    uint8       uint64       uint32   []byte
//
// Multi-byte fields are little-endian.
type Reader struct {
	Path string

	conn net.Conn
	done chan struct{}
}

// Magic is a per-frame sentinel used to resync after partial reads.
const Magic uint64 = 0x485454_50544150 // "HTTPTAP" (ASCII)

var errBadMagic = errors.New("unix: bad frame magic")

// Dial opens the socket. It is safe to call Next immediately afterward.
func Dial(path string) (*Reader, error) {
	c, err := net.Dial("unix", path)
	if err != nil {
		return nil, fmt.Errorf("unix: dial %s: %w", path, err)
	}
	return &Reader{
		Path: path,
		conn: c,
		done: make(chan struct{}),
	}, nil
}

// Next reads one Event. When ctx is cancelled, the underlying conn is
// closed to unblock the read.
func (r *Reader) Next(ctx context.Context) (*source.Event, error) {
	if r.conn == nil {
		return nil, source.ErrClosed
	}
	if d, ok := ctx.Deadline(); ok {
		_ = r.conn.SetReadDeadline(d)
	} else {
		_ = r.conn.SetReadDeadline(time.Time{})
	}
	var hdr [24]byte
	if _, err := io.ReadFull(r.conn, hdr[:]); err != nil {
		return nil, err
	}
	magic := binary.LittleEndian.Uint64(hdr[0:8])
	if magic != Magic {
		return nil, fmt.Errorf("%w: got %#x", errBadMagic, magic)
	}
	dir := hdr[8]
	connID := binary.LittleEndian.Uint64(hdr[9:17])
	length := binary.LittleEndian.Uint32(hdr[17:21])
	if length > 1<<24 {
		return nil, fmt.Errorf("unix: frame length %d exceeds cap", length)
	}
	payload := make([]byte, length)
	if _, err := io.ReadFull(r.conn, payload); err != nil {
		return nil, err
	}
	return &source.Event{
		ConnID:    connID,
		Direction: dir,
		Payload:   payload,
		At:        time.Now(),
	}, nil
}

// Close shuts down the connection.
func (r *Reader) Close() error {
	if r.conn == nil {
		return nil
	}
	err := r.conn.Close()
	r.conn = nil
	close(r.done)
	return err
}

// WriteFrame is the symmetric helper used by tests and fixtures to
// produce fake captures on a paired socket.
func WriteFrame(w io.Writer, dir uint8, connID uint64, payload []byte) error {
	var hdr [24]byte
	binary.LittleEndian.PutUint64(hdr[0:8], Magic)
	hdr[8] = dir
	binary.LittleEndian.PutUint64(hdr[9:17], connID)
	binary.LittleEndian.PutUint32(hdr[17:21], uint32(len(payload)))
	if _, err := w.Write(hdr[:]); err != nil {
		return err
	}
	_, err := w.Write(payload)
	return err
}