//! File tailing with a polling fallback.
//!
//! We deliberately avoid `inotify` dependencies here to stay portable; a
//! 200ms polling loop is good enough for log-watching ergonomics, and we
//! detect rotation by comparing inode and file length. On Linux, an inotify
//! implementation could be layered on top and would slot in behind the same
//! [`TailEvent`] channel.
use std::path::PathBuf;
use std::time::Duration;
use anyhow::{Context, Result};
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom};
use tokio::sync::mpsc::Sender;
use tokio::task::JoinHandle;
use tokio::time::sleep;
/// One event emitted by the tailer.
#[derive(Debug)]
pub enum TailEvent {
/// A new line was read from the watched file.
Line { source: String, line: String },
/// The file appears to have been rotated.
Rotation { source: String },
/// A non-fatal error; the loop will keep running.
Error { source: String, err: String },
}
/// Handle to a running tail task. Dropping it does not stop the task;
/// call [`TailHandle::shutdown`] for graceful exit.
#[derive(Debug)]
pub struct TailHandle {
source: String,
task: Option<JoinHandle<()>>,
cancel: tokio::sync::watch::Sender<bool>,
}
impl TailHandle {
pub fn spawn(
path: PathBuf,
tx: Sender<TailEvent>,
preload_lines: usize,
no_follow: bool,
since: Option<Duration>,
) -> Result<Self> {
let source = path
.file_name()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_else(|| path.display().to_string());
let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
let task = tokio::spawn(run(
path,
source.clone(),
tx,
preload_lines,
no_follow,
since,
cancel_rx,
));
Ok(Self { source, task: Some(task), cancel: cancel_tx })
}
pub async fn shutdown(mut self) {
let _ = self.cancel.send(true);
if let Some(task) = self.task.take() {
let _ = task.await;
}
}
pub fn source(&self) -> &str {
&self.source
}
}
async fn run(
path: PathBuf,
source: String,
tx: Sender<TailEvent>,
preload_lines: usize,
no_follow: bool,
_since: Option<Duration>,
mut cancel: tokio::sync::watch::Receiver<bool>,
) {
let mut file = match File::open(&path).await.context("open") {
Ok(f) => f,
Err(err) => {
let _ = tx.send(TailEvent::Error { source, err: err.to_string() }).await;
return;
}
};
let meta_at_start = file.metadata().await.ok();
let mut start_inode = meta_at_start.as_ref().and_then(inode_of);
let mut start_len = meta_at_start.as_ref().map(|m| m.len()).unwrap_or(0);
if preload_lines == 0 {
let _ = file.seek(SeekFrom::End(0)).await;
}
let mut reader = BufReader::new(file);
loop {
if *cancel.borrow() { return; }
let mut buf = String::new();
match reader.read_line(&mut buf).await {
Ok(0) => {
if no_follow { return; }
// Check for rotation.
if let Ok(meta) = tokio::fs::metadata(&path).await {
let cur_inode = inode_of(&meta);
let cur_len = meta.len();
if cur_inode != start_inode || cur_len < start_len {
let _ = tx.send(TailEvent::Rotation { source: source.clone() }).await;
if let Ok(f) = File::open(&path).await {
reader = BufReader::new(f);
start_inode = cur_inode;
start_len = cur_len;
}
} else {
start_len = cur_len;
}
}
sleep(Duration::from_millis(200)).await;
}
Ok(_) => {
while buf.ends_with('\n') || buf.ends_with('\r') { buf.pop(); }
let _ = tx.send(TailEvent::Line { source: source.clone(), line: buf }).await;
}
Err(err) => {
let _ = tx.send(TailEvent::Error { source: source.clone(), err: err.to_string() }).await;
sleep(Duration::from_millis(500)).await;
}
}
}
}
#[cfg(unix)]
fn inode_of(m: &std::fs::Metadata) -> Option<u64> {
use std::os::unix::fs::MetadataExt;
Some(m.ino())
}
#[cfg(not(unix))]
fn inode_of(_m: &std::fs::Metadata) -> Option<u64> { None }