//! File-backed [`Source`].
//!
//! Opens a path in read-mode, tracks the current offset, and re-enters the
//! read loop whenever the tailing engine signals new data is available.
pub mod rotation;
pub mod seek;
use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom};
use super::{LineItem, Source};
pub struct FileSource {
path: PathBuf,
label: String,
reader: BufReader<File>,
offset: u64,
seq: u64,
}
impl FileSource {
/// Open `path` and start reading from the beginning.
pub async fn open(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().to_path_buf();
let label = path
.file_name()
.map(|n| n.to_string_lossy().into_owned())
.unwrap_or_else(|| path.display().to_string());
let file = File::open(&path).await
.with_context(|| format!("opening {}", path.display()))?;
Ok(Self {
path,
label,
reader: BufReader::new(file),
offset: 0,
seq: 0,
})
}
/// Open `path` and seek so only new lines (appended after this call) will
/// be yielded. See [`seek::from_end`] for the underlying algorithm.
pub async fn follow(path: impl AsRef<Path>) -> Result<Self> {
let mut src = Self::open(path).await?;
let end = src.reader.get_mut().seek(SeekFrom::End(0)).await?;
src.offset = end;
Ok(src)
}
pub fn path(&self) -> &Path {
&self.path
}
pub async fn reopen(&mut self) -> Result<()> {
let file = File::open(&self.path).await
.with_context(|| format!("reopening {}", self.path.display()))?;
self.reader = BufReader::new(file);
self.offset = 0;
Ok(())
}
}
#[async_trait::async_trait]
impl Source for FileSource {
async fn next_line(&mut self) -> Result<Option<LineItem>> {
let mut buf = String::new();
let read = self.reader.read_line(&mut buf).await?;
if read == 0 {
return Ok(None);
}
self.offset += read as u64;
while buf.ends_with('\n') || buf.ends_with('\r') {
buf.pop();
}
self.seq += 1;
Ok(Some(LineItem {
line: buf,
origin: self.label.clone(),
seq: self.seq,
}))
}
fn label(&self) -> &str {
&self.label
}
}