src/tail/engine/poll.rs

//! Polling fallback.
//!
//! A simple stat-every-200ms loop. Used when `inotify` / `kqueue` are
//! unavailable, and useful as the ground truth implementation that we exercise
//! in unit tests.

use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Duration;

use anyhow::Result;
use tokio::sync::mpsc;
use tokio::time;

use crate::source::file::rotation::{Delta, Snapshot};

use super::Event;

pub const DEFAULT_INTERVAL: Duration = Duration::from_millis(200);

pub async fn run(paths: Vec<PathBuf>, tx: mpsc::Sender<Event>) -> Result<()> {
    run_every(paths, tx, DEFAULT_INTERVAL).await
}

pub async fn run_every(
    paths: Vec<PathBuf>,
    tx: mpsc::Sender<Event>,
    interval: Duration,
) -> Result<()> {
    let mut state: HashMap<PathBuf, Snapshot> = HashMap::new();
    for p in &paths {
        if let Ok(snap) = Snapshot::of(p).await {
            state.insert(p.clone(), snap);
        }
    }
    let mut ticker = time::interval(interval);
    ticker.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
    loop {
        ticker.tick().await;
        for p in &paths {
            let current = match Snapshot::of(p).await {
                Ok(s) => s,
                Err(_) => {
                    if tx.send(Event::Gone(p.clone())).await.is_err() {
                        return Ok(());
                    }
                    state.remove(p);
                    continue;
                }
            };
            let delta = match state.get(p) {
                Some(prev) => crate::source::file::rotation::compare(prev, &current),
                None => Delta::Appended,
            };
            let event = match delta {
                Delta::Appended => Some(Event::Grew(p.clone())),
                Delta::Truncated => Some(Event::Shrunk(p.clone())),
                Delta::Rotated | Delta::Replaced => Some(Event::Rotated(p.clone())),
                Delta::Unchanged => None,
            };
            state.insert(p.clone(), current);
            if let Some(e) = event {
                if tx.send(e).await.is_err() {
                    return Ok(());
                }
            }
        }
    }
}