package output
import (
"bytes"
"io"
"sync"
"time"
)
// CloudWatchOptions tunes the writer's batching behaviour. The defaults
// match what Lambda does with stdout: flush every record.
type CloudWatchOptions struct {
// FlushEvery triggers a flush when at least FlushEvery records have
// been queued. 0 means flush on every record.
FlushEvery int
// FlushInterval triggers a periodic flush even if FlushEvery hasn't
// been reached. Zero disables the timer.
FlushInterval time.Duration
// MaxLineBytes is the maximum size of a single queued line; lines over
// this limit are truncated before queueing.
MaxLineBytes int
}
// CloudWatch returns a writer optimised for the CloudWatch Logs Insights
// format. The writer batches records in memory and flushes them in one
// underlying Write call; callers that need strict ordering with other
// writers should disable batching by leaving the options zero.
func CloudWatch(dst io.Writer, opts CloudWatchOptions) io.Writer {
w := &cloudwatch{dst: dst, opts: opts, ticker: nil}
if opts.FlushInterval > 0 {
w.ticker = time.NewTicker(opts.FlushInterval)
go w.loop()
}
return w
}
type cloudwatch struct {
dst io.Writer
opts CloudWatchOptions
mu sync.Mutex
buf bytes.Buffer
queued int
ticker *time.Ticker
closed bool
}
// Write queues p. It never returns a short write.
func (c *cloudwatch) Write(p []byte) (int, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return 0, io.ErrClosedPipe
}
if c.opts.MaxLineBytes > 0 && len(p) > c.opts.MaxLineBytes {
p = p[:c.opts.MaxLineBytes]
}
c.buf.Write(p)
c.queued++
if c.opts.FlushEvery == 0 || c.queued >= c.opts.FlushEvery {
if err := c.flushLocked(); err != nil {
return len(p), err
}
}
return len(p), nil
}
// Flush drains the buffer to the underlying writer.
func (c *cloudwatch) Flush() error {
c.mu.Lock()
defer c.mu.Unlock()
return c.flushLocked()
}
func (c *cloudwatch) flushLocked() error {
if c.buf.Len() == 0 {
c.queued = 0
return nil
}
_, err := c.dst.Write(c.buf.Bytes())
c.buf.Reset()
c.queued = 0
return err
}
// Close stops the periodic timer and flushes remaining data.
func (c *cloudwatch) Close() error {
c.mu.Lock()
c.closed = true
if c.ticker != nil {
c.ticker.Stop()
}
err := c.flushLocked()
c.mu.Unlock()
return err
}
func (c *cloudwatch) loop() {
for range c.ticker.C {
_ = c.Flush()
}
}
// Flusher lets callers manually flush the writer.
type Flusher interface {
Flush() error
}