//! Bounded channel that applies backpressure between the tailer and the
//! renderer.
//!
//! If rendering can't keep up (e.g. because the terminal is slow), we'd
//! rather block the producing tasks than grow memory without bound. The
//! channel here is intentionally small; the "drop" policy below is opt-in.
use crossbeam_channel::{bounded, Receiver, Sender, TrySendError};
use crate::source::LineItem;
pub struct Backpressure {
tx: Sender<LineItem>,
dropped: u64,
policy: DropPolicy,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DropPolicy {
/// Block the producer until there's room. Safe default.
Block,
/// Drop new lines when the channel is full and count them.
DropNewest,
}
impl Backpressure {
pub fn new(capacity: usize, policy: DropPolicy) -> (Self, Receiver<LineItem>) {
let (tx, rx) = bounded(capacity.max(1));
(
Self {
tx,
dropped: 0,
policy,
},
rx,
)
}
pub fn send(&mut self, item: LineItem) -> Result<(), SendError> {
match self.policy {
DropPolicy::Block => self
.tx
.send(item)
.map_err(|_| SendError::Disconnected),
DropPolicy::DropNewest => match self.tx.try_send(item) {
Ok(()) => Ok(()),
Err(TrySendError::Full(_)) => {
self.dropped += 1;
Ok(())
}
Err(TrySendError::Disconnected(_)) => Err(SendError::Disconnected),
},
}
}
pub fn dropped(&self) -> u64 {
self.dropped
}
}
#[derive(Debug, thiserror::Error)]
pub enum SendError {
#[error("consumer has disconnected")]
Disconnected,
}