src/tail.rs

//! File tailing with a polling fallback.
//!
//! We deliberately avoid `inotify` dependencies here to stay portable; a
//! 200ms polling loop is good enough for log-watching ergonomics, and we
//! detect rotation by comparing inode and file length. On Linux, an inotify
//! implementation could be layered on top and would slot in behind the same
//! [`TailEvent`] channel.

use std::path::PathBuf;
use std::time::Duration;

use anyhow::{Context, Result};
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom};
use tokio::sync::mpsc::Sender;
use tokio::task::JoinHandle;
use tokio::time::sleep;

/// One event emitted by the tailer.
#[derive(Debug)]
pub enum TailEvent {
    /// A new line was read from the watched file.
    Line { source: String, line: String },
    /// The file appears to have been rotated.
    Rotation { source: String },
    /// A non-fatal error; the loop will keep running.
    Error { source: String, err: String },
}

/// Handle to a running tail task. Dropping it does not stop the task;
/// call [`TailHandle::shutdown`] for graceful exit.
#[derive(Debug)]
pub struct TailHandle {
    source: String,
    task: Option<JoinHandle<()>>,
    cancel: tokio::sync::watch::Sender<bool>,
}

impl TailHandle {
    pub fn spawn(
        path: PathBuf,
        tx: Sender<TailEvent>,
        preload_lines: usize,
        no_follow: bool,
        since: Option<Duration>,
    ) -> Result<Self> {
        let source = path
            .file_name()
            .map(|s| s.to_string_lossy().into_owned())
            .unwrap_or_else(|| path.display().to_string());
        let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
        let task = tokio::spawn(run(
            path,
            source.clone(),
            tx,
            preload_lines,
            no_follow,
            since,
            cancel_rx,
        ));
        Ok(Self { source, task: Some(task), cancel: cancel_tx })
    }

    pub async fn shutdown(mut self) {
        let _ = self.cancel.send(true);
        if let Some(task) = self.task.take() {
            let _ = task.await;
        }
    }

    pub fn source(&self) -> &str {
        &self.source
    }
}

async fn run(
    path: PathBuf,
    source: String,
    tx: Sender<TailEvent>,
    preload_lines: usize,
    no_follow: bool,
    _since: Option<Duration>,
    mut cancel: tokio::sync::watch::Receiver<bool>,
) {
    let mut file = match File::open(&path).await.context("open") {
        Ok(f) => f,
        Err(err) => {
            let _ = tx.send(TailEvent::Error { source, err: err.to_string() }).await;
            return;
        }
    };

    let meta_at_start = file.metadata().await.ok();
    let mut start_inode = meta_at_start.as_ref().and_then(inode_of);
    let mut start_len = meta_at_start.as_ref().map(|m| m.len()).unwrap_or(0);

    if preload_lines == 0 {
        let _ = file.seek(SeekFrom::End(0)).await;
    }
    let mut reader = BufReader::new(file);

    loop {
        if *cancel.borrow() { return; }

        let mut buf = String::new();
        match reader.read_line(&mut buf).await {
            Ok(0) => {
                if no_follow { return; }
                // Check for rotation.
                if let Ok(meta) = tokio::fs::metadata(&path).await {
                    let cur_inode = inode_of(&meta);
                    let cur_len = meta.len();
                    if cur_inode != start_inode || cur_len < start_len {
                        let _ = tx.send(TailEvent::Rotation { source: source.clone() }).await;
                        if let Ok(f) = File::open(&path).await {
                            reader = BufReader::new(f);
                            start_inode = cur_inode;
                            start_len = cur_len;
                        }
                    } else {
                        start_len = cur_len;
                    }
                }
                sleep(Duration::from_millis(200)).await;
            }
            Ok(_) => {
                while buf.ends_with('\n') || buf.ends_with('\r') { buf.pop(); }
                let _ = tx.send(TailEvent::Line { source: source.clone(), line: buf }).await;
            }
            Err(err) => {
                let _ = tx.send(TailEvent::Error { source: source.clone(), err: err.to_string() }).await;
                sleep(Duration::from_millis(500)).await;
            }
        }
    }
}

#[cfg(unix)]
fn inode_of(m: &std::fs::Metadata) -> Option<u64> {
    use std::os::unix::fs::MetadataExt;
    Some(m.ino())
}

#[cfg(not(unix))]
fn inode_of(_m: &std::fs::Metadata) -> Option<u64> { None }