#[cfg(test)]
mod tests {
use crate::source::LineItem;
use crate::tail::backpressure::{Backpressure, DropPolicy, SendError};
fn item(n: u64) -> LineItem {
LineItem {
line: format!("line-{}", n),
origin: "test".into(),
seq: n,
}
}
#[test]
fn block_policy_delivers_every_item() {
let (mut bp, rx) = Backpressure::new(4, DropPolicy::Block);
for i in 0..4 {
bp.send(item(i)).unwrap();
}
drop(bp);
let received: Vec<_> = rx.iter().collect();
assert_eq!(received.len(), 4);
for (i, item) in received.iter().enumerate() {
assert_eq!(item.seq, i as u64);
}
}
#[test]
fn drop_newest_policy_counts_drops() {
let (mut bp, rx) = Backpressure::new(2, DropPolicy::DropNewest);
for i in 0..5 {
bp.send(item(i)).unwrap();
}
let dropped = bp.dropped();
drop(bp);
let received: Vec<_> = rx.iter().collect();
assert_eq!(received.len(), 2);
assert_eq!(dropped, 3);
}
#[test]
fn disconnected_receiver_errors() {
let (mut bp, rx) = Backpressure::new(2, DropPolicy::Block);
drop(rx);
let err = bp.send(item(0)).unwrap_err();
assert!(matches!(err, SendError::Disconnected));
}
#[test]
fn zero_capacity_is_promoted_to_one() {
let (mut bp, rx) = Backpressure::new(0, DropPolicy::DropNewest);
bp.send(item(1)).unwrap();
bp.send(item(2)).unwrap();
drop(bp);
let got: Vec<_> = rx.iter().collect();
assert!(!got.is_empty());
}
#[test]
fn drop_policy_starts_with_zero_drops() {
let (bp, _rx) = Backpressure::new(4, DropPolicy::DropNewest);
assert_eq!(bp.dropped(), 0);
}
}