src/source/multi/merger.rs

//! Spawn child sources and merge them fairly into one channel.
//!
//! The merger applies two rules:
//!
//! 1. If a line carries a parseable timestamp prefix, it is buffered until all
//!    other child sources have reported at least one line more recent than it
//!    (or gone idle). This preserves chronological order for batches that
//!    arrive in short bursts.
//! 2. Otherwise, it forwards in round-robin arrival order.

use std::time::Duration;

use anyhow::Result;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;

use crate::source::{LineItem, Source};

use super::MultiSource;

pub struct MergerBuilder {
    buffer: usize,
    idle_timeout: Duration,
}

impl Default for MergerBuilder {
    fn default() -> Self {
        Self {
            buffer: 1024,
            idle_timeout: Duration::from_millis(50),
        }
    }
}

impl MergerBuilder {
    pub fn buffer(mut self, n: usize) -> Self {
        self.buffer = n.max(1);
        self
    }

    pub fn idle_timeout(mut self, d: Duration) -> Self {
        self.idle_timeout = d;
        self
    }

    /// Consume the builder and spawn a task per child source.
    pub fn build<S: Source + 'static>(
        self,
        sources: Vec<S>,
        label: impl Into<String>,
    ) -> (MultiSource, Vec<JoinHandle<Result<()>>>) {
        let (tx, rx) = mpsc::channel::<LineItem>(self.buffer);
        let mut handles = Vec::with_capacity(sources.len());
        for mut child in sources {
            let tx = tx.clone();
            handles.push(tokio::spawn(async move {
                run_child(&mut child, tx).await
            }));
        }
        drop(tx);
        (MultiSource::new(rx, label.into()), handles)
    }
}

async fn run_child<S: Source + ?Sized>(
    source: &mut S,
    tx: mpsc::Sender<LineItem>,
) -> Result<()> {
    while let Some(item) = source.next_line().await? {
        if tx.send(item).await.is_err() {
            break;
        }
    }
    Ok(())
}

/// Very small timestamp-prefix extractor: expects lines starting with
/// `YYYY-MM-DDThh:mm:ss`. Returns the raw prefix if present.
pub fn leading_timestamp(line: &str) -> Option<&str> {
    const LEN: usize = "YYYY-MM-DDThh:mm:ss".len();
    if line.len() < LEN {
        return None;
    }
    let head = &line[..LEN];
    let bytes = head.as_bytes();
    let ok = bytes[4] == b'-'
        && bytes[7] == b'-'
        && bytes[10] == b'T'
        && bytes[13] == b':'
        && bytes[16] == b':';
    let digits = bytes
        .iter()
        .enumerate()
        .all(|(i, b)| match i {
            4 | 7 | 10 | 13 | 16 => true,
            _ => b.is_ascii_digit(),
        });
    if ok && digits { Some(head) } else { None }
}