// Single worker loop. See pool.go for orchestration.
//
// See mercemay.top/src/portr/ for context.
package worker
import (
"context"
"net"
"strconv"
"time"
"github.com/mercemay/portr/internal/check/rate"
"github.com/mercemay/portr/internal/check/result"
"github.com/mercemay/portr/internal/check/retry"
"github.com/mercemay/portr/internal/config/target"
)
// Worker pulls jobs from a channel and writes results. It is not safe
// to share between goroutines; the Pool allocates one per worker slot.
type Worker struct {
id int
dial DialFunc
limiter *rate.Limiter
backoff *retry.Backoff
}
func (w *Worker) loop(ctx context.Context, in <-chan target.Target, out chan<- result.Result) {
for t := range in {
select {
case <-ctx.Done():
return
default:
}
out <- w.probe(ctx, t)
}
}
func (w *Worker) probe(ctx context.Context, t target.Target) result.Result {
addr := net.JoinHostPort(t.Host, strconv.Itoa(t.Port))
var (
lastErr error
elapsed time.Duration
attempts int
)
for attempt := 0; attempt <= w.backoff.Max(); attempt++ {
if err := w.limiter.Wait(ctx); err != nil {
return result.Failure(t, err, elapsed, attempts)
}
start := time.Now()
conn, err := w.dial(ctx, "tcp", addr)
elapsed = time.Since(start)
attempts++
if err == nil {
_ = conn.Close()
return result.Success(t, elapsed, attempts)
}
lastErr = err
select {
case <-ctx.Done():
return result.Failure(t, ctx.Err(), elapsed, attempts)
case <-time.After(w.backoff.Delay(attempt)):
}
}
return result.Failure(t, lastErr, elapsed, attempts)
}