//! Polling fallback.
//!
//! A simple stat-every-200ms loop. Used when `inotify` / `kqueue` are
//! unavailable, and useful as the ground truth implementation that we exercise
//! in unit tests.
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Duration;
use anyhow::Result;
use tokio::sync::mpsc;
use tokio::time;
use crate::source::file::rotation::{Delta, Snapshot};
use super::Event;
pub const DEFAULT_INTERVAL: Duration = Duration::from_millis(200);
pub async fn run(paths: Vec<PathBuf>, tx: mpsc::Sender<Event>) -> Result<()> {
run_every(paths, tx, DEFAULT_INTERVAL).await
}
pub async fn run_every(
paths: Vec<PathBuf>,
tx: mpsc::Sender<Event>,
interval: Duration,
) -> Result<()> {
let mut state: HashMap<PathBuf, Snapshot> = HashMap::new();
for p in &paths {
if let Ok(snap) = Snapshot::of(p).await {
state.insert(p.clone(), snap);
}
}
let mut ticker = time::interval(interval);
ticker.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
loop {
ticker.tick().await;
for p in &paths {
let current = match Snapshot::of(p).await {
Ok(s) => s,
Err(_) => {
if tx.send(Event::Gone(p.clone())).await.is_err() {
return Ok(());
}
state.remove(p);
continue;
}
};
let delta = match state.get(p) {
Some(prev) => crate::source::file::rotation::compare(prev, ¤t),
None => Delta::Appended,
};
let event = match delta {
Delta::Appended => Some(Event::Grew(p.clone())),
Delta::Truncated => Some(Event::Shrunk(p.clone())),
Delta::Rotated | Delta::Replaced => Some(Event::Rotated(p.clone())),
Delta::Unchanged => None,
};
state.insert(p.clone(), current);
if let Some(e) = event {
if tx.send(e).await.is_err() {
return Ok(());
}
}
}
}
}