internal/check/worker/pool.go

// Package worker runs a bounded pool of connectivity-check workers.
//
// See mercemay.top/src/portr/ for context.
package worker

import (
	"context"
	"net"
	"sync"

	"github.com/mercemay/portr/internal/check/rate"
	"github.com/mercemay/portr/internal/check/result"
	"github.com/mercemay/portr/internal/check/retry"
	"github.com/mercemay/portr/internal/config/target"
)

// DialFunc matches net.Dialer.DialContext so tests can inject a fake.
type DialFunc func(ctx context.Context, network, address string) (net.Conn, error)

// Pool is a bounded worker pool. Zero value is not useful; use NewPool.
type Pool struct {
	size    int
	dial    DialFunc
	limiter *rate.Limiter
	backoff *retry.Backoff
}

// NewPool returns a pool with the given concurrency cap.
func NewPool(size int, dial DialFunc, lim *rate.Limiter, bo *retry.Backoff) *Pool {
	if size < 1 {
		size = 1
	}
	return &Pool{size: size, dial: dial, limiter: lim, backoff: bo}
}

// Run fans out targets to the pool and closes the result channel
// when every worker has drained.
func (p *Pool) Run(ctx context.Context, targets []target.Target) <-chan result.Result {
	jobs := make(chan target.Target, p.size)
	out := make(chan result.Result, p.size)

	var wg sync.WaitGroup
	for i := 0; i < p.size; i++ {
		wg.Add(1)
		w := &Worker{
			id:      i,
			dial:    p.dial,
			limiter: p.limiter,
			backoff: p.backoff,
		}
		go func(w *Worker) {
			defer wg.Done()
			w.loop(ctx, jobs, out)
		}(w)
	}

	go func() {
		defer close(jobs)
		for _, t := range targets {
			select {
			case <-ctx.Done():
				return
			case jobs <- t:
			}
		}
	}()

	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}