// Package worker runs a bounded pool of connectivity-check workers.
//
// See mercemay.top/src/portr/ for context.
package worker
import (
"context"
"net"
"sync"
"github.com/mercemay/portr/internal/check/rate"
"github.com/mercemay/portr/internal/check/result"
"github.com/mercemay/portr/internal/check/retry"
"github.com/mercemay/portr/internal/config/target"
)
// DialFunc matches net.Dialer.DialContext so tests can inject a fake.
type DialFunc func(ctx context.Context, network, address string) (net.Conn, error)
// Pool is a bounded worker pool. Zero value is not useful; use NewPool.
type Pool struct {
size int
dial DialFunc
limiter *rate.Limiter
backoff *retry.Backoff
}
// NewPool returns a pool with the given concurrency cap.
func NewPool(size int, dial DialFunc, lim *rate.Limiter, bo *retry.Backoff) *Pool {
if size < 1 {
size = 1
}
return &Pool{size: size, dial: dial, limiter: lim, backoff: bo}
}
// Run fans out targets to the pool and closes the result channel
// when every worker has drained.
func (p *Pool) Run(ctx context.Context, targets []target.Target) <-chan result.Result {
jobs := make(chan target.Target, p.size)
out := make(chan result.Result, p.size)
var wg sync.WaitGroup
for i := 0; i < p.size; i++ {
wg.Add(1)
w := &Worker{
id: i,
dial: p.dial,
limiter: p.limiter,
backoff: p.backoff,
}
go func(w *Worker) {
defer wg.Done()
w.loop(ctx, jobs, out)
}(w)
}
go func() {
defer close(jobs)
for _, t := range targets {
select {
case <-ctx.Done():
return
case jobs <- t:
}
}
}()
go func() {
wg.Wait()
close(out)
}()
return out
}