src/source/file/mod.rs

//! File-backed [`Source`].
//!
//! Opens a path in read-mode, tracks the current offset, and re-enters the
//! read loop whenever the tailing engine signals new data is available.

pub mod rotation;
pub mod seek;

use std::path::{Path, PathBuf};

use anyhow::{Context, Result};
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom};

use super::{LineItem, Source};

pub struct FileSource {
    path: PathBuf,
    label: String,
    reader: BufReader<File>,
    offset: u64,
    seq: u64,
}

impl FileSource {
    /// Open `path` and start reading from the beginning.
    pub async fn open(path: impl AsRef<Path>) -> Result<Self> {
        let path = path.as_ref().to_path_buf();
        let label = path
            .file_name()
            .map(|n| n.to_string_lossy().into_owned())
            .unwrap_or_else(|| path.display().to_string());
        let file = File::open(&path).await
            .with_context(|| format!("opening {}", path.display()))?;
        Ok(Self {
            path,
            label,
            reader: BufReader::new(file),
            offset: 0,
            seq: 0,
        })
    }

    /// Open `path` and seek so only new lines (appended after this call) will
    /// be yielded. See [`seek::from_end`] for the underlying algorithm.
    pub async fn follow(path: impl AsRef<Path>) -> Result<Self> {
        let mut src = Self::open(path).await?;
        let end = src.reader.get_mut().seek(SeekFrom::End(0)).await?;
        src.offset = end;
        Ok(src)
    }

    pub fn path(&self) -> &Path {
        &self.path
    }

    pub async fn reopen(&mut self) -> Result<()> {
        let file = File::open(&self.path).await
            .with_context(|| format!("reopening {}", self.path.display()))?;
        self.reader = BufReader::new(file);
        self.offset = 0;
        Ok(())
    }
}

#[async_trait::async_trait]
impl Source for FileSource {
    async fn next_line(&mut self) -> Result<Option<LineItem>> {
        let mut buf = String::new();
        let read = self.reader.read_line(&mut buf).await?;
        if read == 0 {
            return Ok(None);
        }
        self.offset += read as u64;
        while buf.ends_with('\n') || buf.ends_with('\r') {
            buf.pop();
        }
        self.seq += 1;
        Ok(Some(LineItem {
            line: buf,
            origin: self.label.clone(),
            seq: self.seq,
        }))
    }

    fn label(&self) -> &str {
        &self.label
    }
}