internal/source/stdin.go

package source

import (
	"bufio"
	"bytes"
	"context"
	"io"
	"os"
	"time"
)

// Stdin is a quick-and-dirty source that reads an RFC-style HTTP message
// pasted on standard input. It exits after the first blank line following
// a body of declared length, or when stdin returns EOF.
type Stdin struct {
	r    *bufio.Reader
	sent bool
}

// NewStdin wraps os.Stdin.
func NewStdin() *Stdin {
	return &Stdin{r: bufio.NewReader(os.Stdin)}
}

// Next returns exactly one event, containing everything up to the next
// double-CRLF (plus any body declared by Content-Length).
func (s *Stdin) Next(ctx context.Context) (*Event, error) {
	if s.sent {
		return nil, io.EOF
	}
	var buf bytes.Buffer
	for {
		line, err := s.r.ReadBytes('\n')
		buf.Write(line)
		if err != nil {
			if buf.Len() == 0 {
				return nil, io.EOF
			}
			break
		}
		if bytes.Equal(line, []byte("\r\n")) || bytes.Equal(line, []byte("\n")) {
			// Look for Content-Length in what we have so far.
			if n := parseContentLength(buf.Bytes()); n > 0 {
				body := make([]byte, n)
				if _, err := io.ReadFull(s.r, body); err != nil && err != io.EOF {
					return nil, err
				}
				buf.Write(body)
			}
			break
		}
	}
	s.sent = true
	return &Event{
		Direction: 0,
		Payload:   buf.Bytes(),
		At:        time.Now(),
	}, nil
}

// Close is a no-op because we do not own os.Stdin.
func (s *Stdin) Close() error { return nil }

// parseContentLength is a hacky scan kept here to avoid a dep cycle with
// internal/parser/http1. It returns 0 when the header is absent.
func parseContentLength(head []byte) int {
	const prefix = "\nContent-Length:"
	i := bytes.Index(bytes.ToLower(head), []byte("\ncontent-length:"))
	if i < 0 {
		return 0
	}
	end := bytes.IndexByte(head[i+1:], '\n')
	if end < 0 {
		return 0
	}
	line := head[i+len(prefix) : i+1+end]
	n := 0
	for _, c := range bytes.TrimSpace(line) {
		if c < '0' || c > '9' {
			break
		}
		n = n*10 + int(c-'0')
	}
	return n
}