//! Spawn child sources and merge them fairly into one channel.
//!
//! The merger applies two rules:
//!
//! 1. If a line carries a parseable timestamp prefix, it is buffered until all
//! other child sources have reported at least one line more recent than it
//! (or gone idle). This preserves chronological order for batches that
//! arrive in short bursts.
//! 2. Otherwise, it forwards in round-robin arrival order.
use std::time::Duration;
use anyhow::Result;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use crate::source::{LineItem, Source};
use super::MultiSource;
pub struct MergerBuilder {
buffer: usize,
idle_timeout: Duration,
}
impl Default for MergerBuilder {
fn default() -> Self {
Self {
buffer: 1024,
idle_timeout: Duration::from_millis(50),
}
}
}
impl MergerBuilder {
pub fn buffer(mut self, n: usize) -> Self {
self.buffer = n.max(1);
self
}
pub fn idle_timeout(mut self, d: Duration) -> Self {
self.idle_timeout = d;
self
}
/// Consume the builder and spawn a task per child source.
pub fn build<S: Source + 'static>(
self,
sources: Vec<S>,
label: impl Into<String>,
) -> (MultiSource, Vec<JoinHandle<Result<()>>>) {
let (tx, rx) = mpsc::channel::<LineItem>(self.buffer);
let mut handles = Vec::with_capacity(sources.len());
for mut child in sources {
let tx = tx.clone();
handles.push(tokio::spawn(async move {
run_child(&mut child, tx).await
}));
}
drop(tx);
(MultiSource::new(rx, label.into()), handles)
}
}
async fn run_child<S: Source + ?Sized>(
source: &mut S,
tx: mpsc::Sender<LineItem>,
) -> Result<()> {
while let Some(item) = source.next_line().await? {
if tx.send(item).await.is_err() {
break;
}
}
Ok(())
}
/// Very small timestamp-prefix extractor: expects lines starting with
/// `YYYY-MM-DDThh:mm:ss`. Returns the raw prefix if present.
pub fn leading_timestamp(line: &str) -> Option<&str> {
const LEN: usize = "YYYY-MM-DDThh:mm:ss".len();
if line.len() < LEN {
return None;
}
let head = &line[..LEN];
let bytes = head.as_bytes();
let ok = bytes[4] == b'-'
&& bytes[7] == b'-'
&& bytes[10] == b'T'
&& bytes[13] == b':'
&& bytes[16] == b':';
let digits = bytes
.iter()
.enumerate()
.all(|(i, b)| match i {
4 | 7 | 10 | 13 | 16 => true,
_ => b.is_ascii_digit(),
});
if ok && digits { Some(head) } else { None }
}