package logr import ( "bytes" "context" "errors" "fmt" "os" "sync" "sync/atomic" "time" "github.com/wiggin77/merror" ) // Logr maintains a list of log targets and accepts incoming // log records. Use `New` to create instances. type Logr struct { tmux sync.RWMutex // targetHosts mutex targetHosts []*TargetHost in chan *LogRec quit chan struct{} // closed by Shutdown to exit read loop done chan struct{} // closed when read loop exited lvlCache levelCache bufferPool sync.Pool options *options metricsMux sync.RWMutex metrics *metrics shutdown int32 } // New creates a new Logr instance with one or more options specified. // Some options with invalid values can cause an error to be returned, // however `logr.New()` using just defaults never errors. func New(opts ...Option) (*Logr, error) { options := &options{ maxQueueSize: DefaultMaxQueueSize, enqueueTimeout: DefaultEnqueueTimeout, shutdownTimeout: DefaultShutdownTimeout, flushTimeout: DefaultFlushTimeout, maxPooledBuffer: DefaultMaxPooledBuffer, } lgr := &Logr{options: options} // apply the options for _, opt := range opts { if err := opt(lgr); err != nil { return nil, err } } pkgName := GetLogrPackageName() if pkgName != "" { opt := StackFilter(pkgName, pkgName+"/targets", pkgName+"/formatters") _ = opt(lgr) } lgr.in = make(chan *LogRec, lgr.options.maxQueueSize) lgr.quit = make(chan struct{}) lgr.done = make(chan struct{}) if lgr.options.useSyncMapLevelCache { lgr.lvlCache = &syncMapLevelCache{} } else { lgr.lvlCache = &arrayLevelCache{} } lgr.lvlCache.setup() lgr.bufferPool = sync.Pool{ New: func() interface{} { return new(bytes.Buffer) }, } lgr.initMetrics(lgr.options.metricsCollector, lgr.options.metricsUpdateFreqMillis) go lgr.start() return lgr, nil } // AddTarget adds a target to the logger which will receive // log records for outputting. func (lgr *Logr) AddTarget(target Target, name string, filter Filter, formatter Formatter, maxQueueSize int) error { if lgr.IsShutdown() { return fmt.Errorf("AddTarget called after Logr shut down") } lgr.metricsMux.RLock() metrics := lgr.metrics lgr.metricsMux.RUnlock() hostOpts := targetHostOptions{ name: name, filter: filter, formatter: formatter, maxQueueSize: maxQueueSize, metrics: metrics, } host, err := newTargetHost(target, hostOpts) if err != nil { return err } lgr.tmux.Lock() defer lgr.tmux.Unlock() lgr.targetHosts = append(lgr.targetHosts, host) lgr.ResetLevelCache() return nil } // NewLogger creates a Logger using defaults. A `Logger` is light-weight // enough to create on-demand, but typically one or more Loggers are // created and re-used. func (lgr *Logr) NewLogger() Logger { logger := Logger{lgr: lgr} return logger } var levelStatusDisabled = LevelStatus{} // IsLevelEnabled returns true if at least one target has the specified // level enabled. The result is cached so that subsequent checks are fast. func (lgr *Logr) IsLevelEnabled(lvl Level) LevelStatus { // No levels enabled after shutdown if atomic.LoadInt32(&lgr.shutdown) != 0 { return levelStatusDisabled } // Check cache. status, ok := lgr.lvlCache.get(lvl.ID) if ok { return status } status = LevelStatus{} // Cache miss; check each target. lgr.tmux.RLock() defer lgr.tmux.RUnlock() for _, host := range lgr.targetHosts { enabled, level := host.IsLevelEnabled(lvl) if enabled { status.Enabled = true if level.Stacktrace || host.formatter.IsStacktraceNeeded() { status.Stacktrace = true break // if both level and stacktrace enabled then no sense checking more targets } } } // Cache and return the result. if err := lgr.lvlCache.put(lvl.ID, status); err != nil { lgr.ReportError(err) return LevelStatus{} } return status } // HasTargets returns true only if at least one target exists within the lgr. func (lgr *Logr) HasTargets() bool { lgr.tmux.RLock() defer lgr.tmux.RUnlock() return len(lgr.targetHosts) > 0 } // TargetInfo provides name and type for a Target. type TargetInfo struct { Name string Type string } // TargetInfos enumerates all the targets added to this lgr. // The resulting slice represents a snapshot at time of calling. func (lgr *Logr) TargetInfos() []TargetInfo { infos := make([]TargetInfo, 0) lgr.tmux.RLock() defer lgr.tmux.RUnlock() for _, host := range lgr.targetHosts { inf := TargetInfo{ Name: host.String(), Type: fmt.Sprintf("%T", host.target), } infos = append(infos, inf) } return infos } // RemoveTargets safely removes one or more targets based on the filtering method. // f should return true to delete the target, false to keep it. // When removing a target, best effort is made to write any queued log records before // closing, with cxt determining how much time can be spent in total. // Note, keep the timeout short since this method blocks certain logging operations. func (lgr *Logr) RemoveTargets(cxt context.Context, f func(ti TargetInfo) bool) error { errs := merror.New() hosts := make([]*TargetHost, 0) lgr.tmux.Lock() defer lgr.tmux.Unlock() for _, host := range lgr.targetHosts { inf := TargetInfo{ Name: host.String(), Type: fmt.Sprintf("%T", host.target), } if f(inf) { if err := host.Shutdown(cxt); err != nil { errs.Append(err) } } else { hosts = append(hosts, host) } } lgr.targetHosts = hosts lgr.ResetLevelCache() return errs.ErrorOrNil() } // ResetLevelCache resets the cached results of `IsLevelEnabled`. This is // called any time a Target is added or a target's level is changed. func (lgr *Logr) ResetLevelCache() { lgr.lvlCache.clear() } // SetMetricsCollector sets (or resets) the metrics collector to be used for gathering // metrics for all targets. Only targets added after this call will use the collector. // // To ensure all targets use a collector, use the `SetMetricsCollector` option when // creating the Logr instead, or configure/reconfigure the Logr after calling this method. func (lgr *Logr) SetMetricsCollector(collector MetricsCollector, updateFreqMillis int64) { lgr.initMetrics(collector, updateFreqMillis) } // enqueue adds a log record to the logr queue. If the queue is full then // this function either blocks or the log record is dropped, depending on // the result of calling `OnQueueFull`. func (lgr *Logr) enqueue(rec *LogRec) { select { case lgr.in <- rec: default: if lgr.options.onQueueFull != nil && lgr.options.onQueueFull(rec, cap(lgr.in)) { return // drop the record } select { case <-time.After(lgr.options.enqueueTimeout): lgr.ReportError(fmt.Errorf("enqueue timed out for log rec [%v]", rec)) case lgr.in <- rec: // block until success or timeout } } } // Flush blocks while flushing the logr queue and all target queues, by // writing existing log records to valid targets. // Any attempts to add new log records will block until flush is complete. // `logr.FlushTimeout` determines how long flush can execute before // timing out. Use `IsTimeoutError` to determine if the returned error is // due to a timeout. func (lgr *Logr) Flush() error { ctx, cancel := context.WithTimeout(context.Background(), lgr.options.flushTimeout) defer cancel() return lgr.FlushWithTimeout(ctx) } // Flush blocks while flushing the logr queue and all target queues, by // writing existing log records to valid targets. // Any attempts to add new log records will block until flush is complete. // Use `IsTimeoutError` to determine if the returned error is // due to a timeout. func (lgr *Logr) FlushWithTimeout(ctx context.Context) error { if !lgr.HasTargets() { return nil } if lgr.IsShutdown() { return errors.New("Flush called on shut down Logr") } rec := newFlushLogRec(lgr.NewLogger()) lgr.enqueue(rec) select { case <-ctx.Done(): return newTimeoutError("logr queue flush timeout") case <-rec.flush: } return nil } // IsShutdown returns true if this Logr instance has been shut down. // No further log records can be enqueued and no targets added after // shutdown. func (lgr *Logr) IsShutdown() bool { return atomic.LoadInt32(&lgr.shutdown) != 0 } // Shutdown cleanly stops the logging engine after making best efforts // to flush all targets. Call this function right before application // exit - logr cannot be restarted once shut down. // `logr.ShutdownTimeout` determines how long shutdown can execute before // timing out. Use `IsTimeoutError` to determine if the returned error is // due to a timeout. func (lgr *Logr) Shutdown() error { ctx, cancel := context.WithTimeout(context.Background(), lgr.options.shutdownTimeout) defer cancel() return lgr.ShutdownWithTimeout(ctx) } // Shutdown cleanly stops the logging engine after making best efforts // to flush all targets. Call this function right before application // exit - logr cannot be restarted once shut down. // Use `IsTimeoutError` to determine if the returned error is due to a // timeout. func (lgr *Logr) ShutdownWithTimeout(ctx context.Context) error { if err := lgr.FlushWithTimeout(ctx); err != nil { return err } if atomic.SwapInt32(&lgr.shutdown, 1) != 0 { return errors.New("Shutdown called again after shut down") } lgr.ResetLevelCache() lgr.stopMetricsUpdater() close(lgr.quit) errs := merror.New() // Wait for read loop to exit select { case <-ctx.Done(): errs.Append(newTimeoutError("logr queue shutdown timeout")) case <-lgr.done: } // logr.in channel should now be drained to targets and no more log records // can be added. lgr.tmux.RLock() defer lgr.tmux.RUnlock() for _, host := range lgr.targetHosts { err := host.Shutdown(ctx) if err != nil { errs.Append(err) } } return errs.ErrorOrNil() } // ReportError is used to notify the host application of any internal logging errors. // If `OnLoggerError` is not nil, it is called with the error, otherwise the error is // output to `os.Stderr`. func (lgr *Logr) ReportError(err interface{}) { lgr.incErrorCounter() if lgr.options.onLoggerError == nil { fmt.Fprintln(os.Stderr, err) return } lgr.options.onLoggerError(fmt.Errorf("%v", err)) } // BorrowBuffer borrows a buffer from the pool. Release the buffer to reduce garbage collection. func (lgr *Logr) BorrowBuffer() *bytes.Buffer { if lgr.options.disableBufferPool { return &bytes.Buffer{} } return lgr.bufferPool.Get().(*bytes.Buffer) } // ReleaseBuffer returns a buffer to the pool to reduce garbage collection. The buffer is only // retained if less than MaxPooledBuffer. func (lgr *Logr) ReleaseBuffer(buf *bytes.Buffer) { if !lgr.options.disableBufferPool && buf.Cap() < lgr.options.maxPooledBuffer { buf.Reset() lgr.bufferPool.Put(buf) } } // start selects on incoming log records until shutdown record is received. // Incoming log records are fanned out to all log targets. func (lgr *Logr) start() { defer func() { if r := recover(); r != nil { lgr.ReportError(r) go lgr.start() } else { close(lgr.done) } }() for { var rec *LogRec select { case rec = <-lgr.in: if rec.flush != nil { lgr.flush(rec.flush) } else { rec.prep() lgr.fanout(rec) } case <-lgr.quit: return } } } // fanout pushes a LogRec to all targets. func (lgr *Logr) fanout(rec *LogRec) { var host *TargetHost defer func() { if r := recover(); r != nil { lgr.ReportError(fmt.Errorf("fanout failed for target %s, %v", host.String(), r)) } }() var logged bool lgr.tmux.RLock() defer lgr.tmux.RUnlock() for _, host = range lgr.targetHosts { if enabled, _ := host.IsLevelEnabled(rec.Level()); enabled { host.Log(rec) logged = true } } if logged { lgr.incLoggedCounter() } } // flush drains the queue and notifies when done. func (lgr *Logr) flush(done chan<- struct{}) { // first drain the logr queue. loop: for { var rec *LogRec select { case rec = <-lgr.in: if rec.flush == nil { rec.prep() lgr.fanout(rec) } default: break loop } } logger := lgr.NewLogger() // drain all the targets; block until finished. lgr.tmux.RLock() defer lgr.tmux.RUnlock() for _, host := range lgr.targetHosts { rec := newFlushLogRec(logger) host.Log(rec) <-rec.flush } done <- struct{}{} }