mirror of
https://github.com/cwinfo/matterbridge.git
synced 2024-11-22 20:40:27 +00:00
300 lines
6.5 KiB
Go
300 lines
6.5 KiB
Go
package logr
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Target represents a destination for log records such as file,
|
|
// database, TCP socket, etc.
|
|
type Target interface {
|
|
// SetName provides an optional name for the target.
|
|
SetName(name string)
|
|
|
|
// IsLevelEnabled returns true if this target should emit
|
|
// logs for the specified level. Also determines if
|
|
// a stack trace is required.
|
|
IsLevelEnabled(Level) (enabled bool, stacktrace bool)
|
|
|
|
// Formatter returns the Formatter associated with this Target.
|
|
Formatter() Formatter
|
|
|
|
// Log outputs the log record to this target's destination.
|
|
Log(rec *LogRec)
|
|
|
|
// Shutdown makes best effort to flush target queue and
|
|
// frees/closes all resources.
|
|
Shutdown(ctx context.Context) error
|
|
}
|
|
|
|
// RecordWriter can convert a LogRecord to bytes and output to some data sink.
|
|
type RecordWriter interface {
|
|
Write(rec *LogRec) error
|
|
}
|
|
|
|
// Basic provides the basic functionality of a Target that can be used
|
|
// to more easily compose your own Targets. To use, just embed Basic
|
|
// in your target type, implement `RecordWriter`, and call `(*Basic).Start`.
|
|
type Basic struct {
|
|
target Target
|
|
|
|
filter Filter
|
|
formatter Formatter
|
|
|
|
in chan *LogRec
|
|
done chan struct{}
|
|
w RecordWriter
|
|
|
|
mux sync.RWMutex
|
|
name string
|
|
|
|
metrics bool
|
|
queueSizeGauge Gauge
|
|
loggedCounter Counter
|
|
errorCounter Counter
|
|
droppedCounter Counter
|
|
blockedCounter Counter
|
|
|
|
metricsUpdateFreqMillis int64
|
|
}
|
|
|
|
// Start initializes this target helper and starts accepting log records for processing.
|
|
func (b *Basic) Start(target Target, rw RecordWriter, filter Filter, formatter Formatter, maxQueued int) {
|
|
if filter == nil {
|
|
filter = &StdFilter{Lvl: Fatal}
|
|
}
|
|
if formatter == nil {
|
|
formatter = &DefaultFormatter{}
|
|
}
|
|
|
|
b.target = target
|
|
b.filter = filter
|
|
b.formatter = formatter
|
|
b.in = make(chan *LogRec, maxQueued)
|
|
b.done = make(chan struct{}, 1)
|
|
b.w = rw
|
|
go b.start()
|
|
|
|
if b.hasMetrics() {
|
|
go b.startMetricsUpdater()
|
|
}
|
|
}
|
|
|
|
func (b *Basic) SetName(name string) {
|
|
b.mux.Lock()
|
|
defer b.mux.Unlock()
|
|
b.name = name
|
|
}
|
|
|
|
// IsLevelEnabled returns true if this target should emit
|
|
// logs for the specified level. Also determines if
|
|
// a stack trace is required.
|
|
func (b *Basic) IsLevelEnabled(lvl Level) (enabled bool, stacktrace bool) {
|
|
return b.filter.IsEnabled(lvl), b.filter.IsStacktraceEnabled(lvl)
|
|
}
|
|
|
|
// Formatter returns the Formatter associated with this Target.
|
|
func (b *Basic) Formatter() Formatter {
|
|
return b.formatter
|
|
}
|
|
|
|
// Shutdown stops processing log records after making best
|
|
// effort to flush queue.
|
|
func (b *Basic) Shutdown(ctx context.Context) error {
|
|
// close the incoming channel and wait for read loop to exit.
|
|
close(b.in)
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-b.done:
|
|
}
|
|
|
|
// b.in channel should now be drained.
|
|
return nil
|
|
}
|
|
|
|
// Log outputs the log record to this targets destination.
|
|
func (b *Basic) Log(rec *LogRec) {
|
|
lgr := rec.Logger().Logr()
|
|
select {
|
|
case b.in <- rec:
|
|
default:
|
|
handler := lgr.OnTargetQueueFull
|
|
if handler != nil && handler(b.target, rec, cap(b.in)) {
|
|
b.incDroppedCounter()
|
|
return // drop the record
|
|
}
|
|
b.incBlockedCounter()
|
|
|
|
select {
|
|
case <-time.After(lgr.enqueueTimeout()):
|
|
lgr.ReportError(fmt.Errorf("target enqueue timeout for log rec [%v]", rec))
|
|
case b.in <- rec: // block until success or timeout
|
|
}
|
|
}
|
|
}
|
|
|
|
// Metrics enables metrics collection using the provided MetricsCollector.
|
|
func (b *Basic) EnableMetrics(collector MetricsCollector, updateFreqMillis int64) error {
|
|
name := fmt.Sprintf("%v", b)
|
|
|
|
b.mux.Lock()
|
|
defer b.mux.Unlock()
|
|
|
|
b.metrics = true
|
|
b.metricsUpdateFreqMillis = updateFreqMillis
|
|
|
|
var err error
|
|
|
|
if b.queueSizeGauge, err = collector.QueueSizeGauge(name); err != nil {
|
|
return err
|
|
}
|
|
if b.loggedCounter, err = collector.LoggedCounter(name); err != nil {
|
|
return err
|
|
}
|
|
if b.errorCounter, err = collector.ErrorCounter(name); err != nil {
|
|
return err
|
|
}
|
|
if b.droppedCounter, err = collector.DroppedCounter(name); err != nil {
|
|
return err
|
|
}
|
|
if b.blockedCounter, err = collector.BlockedCounter(name); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *Basic) hasMetrics() bool {
|
|
b.mux.RLock()
|
|
defer b.mux.RUnlock()
|
|
return b.metrics
|
|
}
|
|
|
|
func (b *Basic) setQueueSizeGauge(val float64) {
|
|
b.mux.RLock()
|
|
defer b.mux.RUnlock()
|
|
if b.queueSizeGauge != nil {
|
|
b.queueSizeGauge.Set(val)
|
|
}
|
|
}
|
|
|
|
func (b *Basic) incLoggedCounter() {
|
|
b.mux.RLock()
|
|
defer b.mux.RUnlock()
|
|
if b.loggedCounter != nil {
|
|
b.loggedCounter.Inc()
|
|
}
|
|
}
|
|
|
|
func (b *Basic) incErrorCounter() {
|
|
b.mux.RLock()
|
|
defer b.mux.RUnlock()
|
|
if b.errorCounter != nil {
|
|
b.errorCounter.Inc()
|
|
}
|
|
}
|
|
|
|
func (b *Basic) incDroppedCounter() {
|
|
b.mux.RLock()
|
|
defer b.mux.RUnlock()
|
|
if b.droppedCounter != nil {
|
|
b.droppedCounter.Inc()
|
|
}
|
|
}
|
|
|
|
func (b *Basic) incBlockedCounter() {
|
|
b.mux.RLock()
|
|
defer b.mux.RUnlock()
|
|
if b.blockedCounter != nil {
|
|
b.blockedCounter.Inc()
|
|
}
|
|
}
|
|
|
|
// String returns a name for this target. Use `SetName` to specify a name.
|
|
func (b *Basic) String() string {
|
|
b.mux.RLock()
|
|
defer b.mux.RUnlock()
|
|
|
|
if b.name != "" {
|
|
return b.name
|
|
}
|
|
return fmt.Sprintf("%T", b.target)
|
|
}
|
|
|
|
// Start accepts log records via In channel and writes to the
|
|
// supplied writer, until Done channel signaled.
|
|
func (b *Basic) start() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
fmt.Fprintln(os.Stderr, "Basic.start -- ", r)
|
|
go b.start()
|
|
}
|
|
}()
|
|
|
|
for rec := range b.in {
|
|
if rec.flush != nil {
|
|
b.flush(rec.flush)
|
|
} else {
|
|
err := b.w.Write(rec)
|
|
if err != nil {
|
|
b.incErrorCounter()
|
|
rec.Logger().Logr().ReportError(err)
|
|
} else {
|
|
b.incLoggedCounter()
|
|
}
|
|
}
|
|
}
|
|
close(b.done)
|
|
}
|
|
|
|
// startMetricsUpdater updates the metrics for any polled values every `MetricsUpdateFreqSecs` seconds until
|
|
// target is closed.
|
|
func (b *Basic) startMetricsUpdater() {
|
|
for {
|
|
updateFreq := b.getMetricsUpdateFreqMillis()
|
|
if updateFreq == 0 {
|
|
updateFreq = DefMetricsUpdateFreqMillis
|
|
}
|
|
if updateFreq < 250 {
|
|
updateFreq = 250 // don't peg the CPU
|
|
}
|
|
|
|
select {
|
|
case <-b.done:
|
|
return
|
|
case <-time.After(time.Duration(updateFreq) * time.Millisecond):
|
|
b.setQueueSizeGauge(float64(len(b.in)))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *Basic) getMetricsUpdateFreqMillis() int64 {
|
|
b.mux.RLock()
|
|
defer b.mux.RUnlock()
|
|
return b.metricsUpdateFreqMillis
|
|
}
|
|
|
|
// flush drains the queue and notifies when done.
|
|
func (b *Basic) flush(done chan<- struct{}) {
|
|
for {
|
|
var rec *LogRec
|
|
var err error
|
|
select {
|
|
case rec = <-b.in:
|
|
// ignore any redundant flush records.
|
|
if rec.flush == nil {
|
|
err = b.w.Write(rec)
|
|
if err != nil {
|
|
b.incErrorCounter()
|
|
rec.Logger().Logr().ReportError(err)
|
|
}
|
|
}
|
|
default:
|
|
done <- struct{}{}
|
|
return
|
|
}
|
|
}
|
|
}
|