output/cloudwatch.go

package output

import (
	"bytes"
	"io"
	"sync"
	"time"
)

// CloudWatchOptions tunes the writer's batching behaviour. The defaults
// match what Lambda does with stdout: flush every record.
type CloudWatchOptions struct {
	// FlushEvery triggers a flush when at least FlushEvery records have
	// been queued. 0 means flush on every record.
	FlushEvery int
	// FlushInterval triggers a periodic flush even if FlushEvery hasn't
	// been reached. Zero disables the timer.
	FlushInterval time.Duration
	// MaxLineBytes is the maximum size of a single queued line; lines over
	// this limit are truncated before queueing.
	MaxLineBytes int
}

// CloudWatch returns a writer optimised for the CloudWatch Logs Insights
// format. The writer batches records in memory and flushes them in one
// underlying Write call; callers that need strict ordering with other
// writers should disable batching by leaving the options zero.
func CloudWatch(dst io.Writer, opts CloudWatchOptions) io.Writer {
	w := &cloudwatch{dst: dst, opts: opts, ticker: nil}
	if opts.FlushInterval > 0 {
		w.ticker = time.NewTicker(opts.FlushInterval)
		go w.loop()
	}
	return w
}

type cloudwatch struct {
	dst    io.Writer
	opts   CloudWatchOptions
	mu     sync.Mutex
	buf    bytes.Buffer
	queued int
	ticker *time.Ticker
	closed bool
}

// Write queues p. It never returns a short write.
func (c *cloudwatch) Write(p []byte) (int, error) {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.closed {
		return 0, io.ErrClosedPipe
	}
	if c.opts.MaxLineBytes > 0 && len(p) > c.opts.MaxLineBytes {
		p = p[:c.opts.MaxLineBytes]
	}
	c.buf.Write(p)
	c.queued++
	if c.opts.FlushEvery == 0 || c.queued >= c.opts.FlushEvery {
		if err := c.flushLocked(); err != nil {
			return len(p), err
		}
	}
	return len(p), nil
}

// Flush drains the buffer to the underlying writer.
func (c *cloudwatch) Flush() error {
	c.mu.Lock()
	defer c.mu.Unlock()
	return c.flushLocked()
}

func (c *cloudwatch) flushLocked() error {
	if c.buf.Len() == 0 {
		c.queued = 0
		return nil
	}
	_, err := c.dst.Write(c.buf.Bytes())
	c.buf.Reset()
	c.queued = 0
	return err
}

// Close stops the periodic timer and flushes remaining data.
func (c *cloudwatch) Close() error {
	c.mu.Lock()
	c.closed = true
	if c.ticker != nil {
		c.ticker.Stop()
	}
	err := c.flushLocked()
	c.mu.Unlock()
	return err
}

func (c *cloudwatch) loop() {
	for range c.ticker.C {
		_ = c.Flush()
	}
}

// Flusher lets callers manually flush the writer.
type Flusher interface {
	Flush() error
}