src/tail/backpressure.rs

//! Bounded channel that applies backpressure between the tailer and the
//! renderer.
//!
//! If rendering can't keep up (e.g. because the terminal is slow), we'd
//! rather block the producing tasks than grow memory without bound. The
//! channel here is intentionally small; the "drop" policy below is opt-in.

use crossbeam_channel::{bounded, Receiver, Sender, TrySendError};

use crate::source::LineItem;

pub struct Backpressure {
    tx: Sender<LineItem>,
    dropped: u64,
    policy: DropPolicy,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DropPolicy {
    /// Block the producer until there's room. Safe default.
    Block,
    /// Drop new lines when the channel is full and count them.
    DropNewest,
}

impl Backpressure {
    pub fn new(capacity: usize, policy: DropPolicy) -> (Self, Receiver<LineItem>) {
        let (tx, rx) = bounded(capacity.max(1));
        (
            Self {
                tx,
                dropped: 0,
                policy,
            },
            rx,
        )
    }

    pub fn send(&mut self, item: LineItem) -> Result<(), SendError> {
        match self.policy {
            DropPolicy::Block => self
                .tx
                .send(item)
                .map_err(|_| SendError::Disconnected),
            DropPolicy::DropNewest => match self.tx.try_send(item) {
                Ok(()) => Ok(()),
                Err(TrySendError::Full(_)) => {
                    self.dropped += 1;
                    Ok(())
                }
                Err(TrySendError::Disconnected(_)) => Err(SendError::Disconnected),
            },
        }
    }

    pub fn dropped(&self) -> u64 {
        self.dropped
    }
}

#[derive(Debug, thiserror::Error)]
pub enum SendError {
    #[error("consumer has disconnected")]
    Disconnected,
}