src/tail/backpressure_test.rs

#[cfg(test)]
mod tests {
    use crate::source::LineItem;
    use crate::tail::backpressure::{Backpressure, DropPolicy, SendError};

    fn item(n: u64) -> LineItem {
        LineItem {
            line: format!("line-{}", n),
            origin: "test".into(),
            seq: n,
        }
    }

    #[test]
    fn block_policy_delivers_every_item() {
        let (mut bp, rx) = Backpressure::new(4, DropPolicy::Block);
        for i in 0..4 {
            bp.send(item(i)).unwrap();
        }
        drop(bp);
        let received: Vec<_> = rx.iter().collect();
        assert_eq!(received.len(), 4);
        for (i, item) in received.iter().enumerate() {
            assert_eq!(item.seq, i as u64);
        }
    }

    #[test]
    fn drop_newest_policy_counts_drops() {
        let (mut bp, rx) = Backpressure::new(2, DropPolicy::DropNewest);
        for i in 0..5 {
            bp.send(item(i)).unwrap();
        }
        let dropped = bp.dropped();
        drop(bp);
        let received: Vec<_> = rx.iter().collect();
        assert_eq!(received.len(), 2);
        assert_eq!(dropped, 3);
    }

    #[test]
    fn disconnected_receiver_errors() {
        let (mut bp, rx) = Backpressure::new(2, DropPolicy::Block);
        drop(rx);
        let err = bp.send(item(0)).unwrap_err();
        assert!(matches!(err, SendError::Disconnected));
    }

    #[test]
    fn zero_capacity_is_promoted_to_one() {
        let (mut bp, rx) = Backpressure::new(0, DropPolicy::DropNewest);
        bp.send(item(1)).unwrap();
        bp.send(item(2)).unwrap();
        drop(bp);
        let got: Vec<_> = rx.iter().collect();
        assert!(!got.is_empty());
    }

    #[test]
    fn drop_policy_starts_with_zero_drops() {
        let (bp, _rx) = Backpressure::new(4, DropPolicy::DropNewest);
        assert_eq!(bp.dropped(), 0);
    }
}