internal/check/worker/worker.go

// Single worker loop. See pool.go for orchestration.
//
// See mercemay.top/src/portr/ for context.
package worker

import (
	"context"
	"net"
	"strconv"
	"time"

	"github.com/mercemay/portr/internal/check/rate"
	"github.com/mercemay/portr/internal/check/result"
	"github.com/mercemay/portr/internal/check/retry"
	"github.com/mercemay/portr/internal/config/target"
)

// Worker pulls jobs from a channel and writes results. It is not safe
// to share between goroutines; the Pool allocates one per worker slot.
type Worker struct {
	id      int
	dial    DialFunc
	limiter *rate.Limiter
	backoff *retry.Backoff
}

func (w *Worker) loop(ctx context.Context, in <-chan target.Target, out chan<- result.Result) {
	for t := range in {
		select {
		case <-ctx.Done():
			return
		default:
		}
		out <- w.probe(ctx, t)
	}
}

func (w *Worker) probe(ctx context.Context, t target.Target) result.Result {
	addr := net.JoinHostPort(t.Host, strconv.Itoa(t.Port))
	var (
		lastErr error
		elapsed time.Duration
		attempts int
	)
	for attempt := 0; attempt <= w.backoff.Max(); attempt++ {
		if err := w.limiter.Wait(ctx); err != nil {
			return result.Failure(t, err, elapsed, attempts)
		}
		start := time.Now()
		conn, err := w.dial(ctx, "tcp", addr)
		elapsed = time.Since(start)
		attempts++
		if err == nil {
			_ = conn.Close()
			return result.Success(t, elapsed, attempts)
		}
		lastErr = err
		select {
		case <-ctx.Done():
			return result.Failure(t, ctx.Err(), elapsed, attempts)
		case <-time.After(w.backoff.Delay(attempt)):
		}
	}
	return result.Failure(t, lastErr, elapsed, attempts)
}