4
0
mirror of https://github.com/cwinfo/matterbridge.git synced 2025-07-04 06:37:45 +00:00

Update vendor (#1265)

This commit is contained in:
Wim
2020-10-19 23:40:00 +02:00
committed by GitHub
parent 950f2759bd
commit 075a84427f
242 changed files with 22338 additions and 1486 deletions

View File

@ -4,9 +4,13 @@
package mlog
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"github.com/mattermost/logr"
)
// defaultLog manually encodes the log to STDERR, providing a basic, default logging implementation
@ -49,3 +53,43 @@ func defaultCriticalLog(msg string, fields ...Field) {
// We map critical to error in zap, so be consistent.
defaultLog("error", msg, fields...)
}
func defaultCustomLog(lvl LogLevel, msg string, fields ...Field) {
// custom log levels are only output once log targets are configured.
}
func defaultCustomMultiLog(lvl []LogLevel, msg string, fields ...Field) {
// custom log levels are only output once log targets are configured.
}
func defaultFlush(ctx context.Context) error {
return nil
}
func defaultAdvancedConfig(cfg LogTargetCfg) error {
// mlog.ConfigAdvancedConfig should not be called until default
// logger is replaced with mlog.Logger instance.
return errors.New("cannot config advanced logging on default logger")
}
func defaultAdvancedShutdown(ctx context.Context) error {
return nil
}
func defaultAddTarget(targets ...logr.Target) error {
// mlog.AddTarget should not be called until default
// logger is replaced with mlog.Logger instance.
return errors.New("cannot AddTarget on default logger")
}
func defaultRemoveTargets(ctx context.Context, f func(TargetInfo) bool) error {
// mlog.RemoveTargets should not be called until default
// logger is replaced with mlog.Logger instance.
return errors.New("cannot RemoveTargets on default logger")
}
func defaultEnableMetrics(collector logr.MetricsCollector) error {
// mlog.EnableMetrics should not be called until default
// logger is replaced with mlog.Logger instance.
return errors.New("cannot EnableMetrics on default logger")
}

View File

@ -0,0 +1,30 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package mlog
import "github.com/mattermost/logr"
// onLoggerError is called when the logging system encounters an error,
// such as a target not able to write records. The targets will keep trying
// however the error will be logged with a dedicated level that can be output
// to a safe/always available target for monitoring or alerting.
func onLoggerError(err error) {
Log(LvlLogError, "advanced logging error", Err(err))
}
// onQueueFull is called when the main logger queue is full, indicating the
// volume and frequency of log record creation is too high for the queue size
// and/or the target latencies.
func onQueueFull(rec *logr.LogRec, maxQueueSize int) bool {
Log(LvlLogError, "main queue full, dropping record", Any("rec", rec))
return true // drop record
}
// onTargetQueueFull is called when the main logger queue is full, indicating the
// volume and frequency of log record creation is too high for the target's queue size
// and/or the target latency.
func onTargetQueueFull(target logr.Target, rec *logr.LogRec, maxQueueSize int) bool {
Log(LvlLogError, "target queue full, dropping record", String("target", ""), Any("rec", rec))
return true // drop record
}

View File

@ -4,6 +4,11 @@
package mlog
import (
"context"
"log"
"sync/atomic"
"github.com/mattermost/logr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
@ -11,6 +16,10 @@ import (
var globalLogger *Logger
func InitGlobalLogger(logger *Logger) {
// Clean up previous instance.
if globalLogger != nil && globalLogger.logrLogger != nil {
globalLogger.logrLogger.Logr().Shutdown()
}
glob := *logger
glob.zap = glob.zap.WithOptions(zap.AddCallerSkip(1))
globalLogger = &glob
@ -19,13 +28,46 @@ func InitGlobalLogger(logger *Logger) {
Warn = globalLogger.Warn
Error = globalLogger.Error
Critical = globalLogger.Critical
Log = globalLogger.Log
LogM = globalLogger.LogM
Flush = globalLogger.Flush
ConfigAdvancedLogging = globalLogger.ConfigAdvancedLogging
ShutdownAdvancedLogging = globalLogger.ShutdownAdvancedLogging
AddTarget = globalLogger.AddTarget
RemoveTargets = globalLogger.RemoveTargets
EnableMetrics = globalLogger.EnableMetrics
}
// logWriterFunc provides access to mlog via io.Writer, so the standard logger
// can be redirected to use mlog and whatever targets are defined.
type logWriterFunc func([]byte) (int, error)
func (lw logWriterFunc) Write(p []byte) (int, error) {
return lw(p)
}
func RedirectStdLog(logger *Logger) {
zap.RedirectStdLogAt(logger.zap.With(zap.String("source", "stdlog")).WithOptions(zap.AddCallerSkip(-2)), zapcore.ErrorLevel)
if atomic.LoadInt32(&disableZap) == 0 {
zap.RedirectStdLogAt(logger.zap.With(zap.String("source", "stdlog")).WithOptions(zap.AddCallerSkip(-2)), zapcore.ErrorLevel)
return
}
writer := func(p []byte) (int, error) {
Log(LvlStdLog, string(p))
return len(p), nil
}
log.SetOutput(logWriterFunc(writer))
}
type LogFunc func(string, ...Field)
type LogFuncCustom func(LogLevel, string, ...Field)
type LogFuncCustomMulti func([]LogLevel, string, ...Field)
type FlushFunc func(context.Context) error
type ConfigFunc func(cfg LogTargetCfg) error
type ShutdownFunc func(context.Context) error
type AddTargetFunc func(...logr.Target) error
type RemoveTargetsFunc func(context.Context, func(TargetInfo) bool) error
type EnableMetricsFunc func(logr.MetricsCollector) error
// DON'T USE THIS Modify the level on the app logger
func GloballyDisableDebugLogForTest() {
@ -42,3 +84,12 @@ var Info LogFunc = defaultInfoLog
var Warn LogFunc = defaultWarnLog
var Error LogFunc = defaultErrorLog
var Critical LogFunc = defaultCriticalLog
var Log LogFuncCustom = defaultCustomLog
var LogM LogFuncCustomMulti = defaultCustomMultiLog
var Flush FlushFunc = defaultFlush
var ConfigAdvancedLogging ConfigFunc = defaultAdvancedConfig
var ShutdownAdvancedLogging ShutdownFunc = defaultAdvancedShutdown
var AddTarget AddTargetFunc = defaultAddTarget
var RemoveTargets RemoveTargetsFunc = defaultRemoveTargets
var EnableMetrics EnableMetricsFunc = defaultEnableMetrics

View File

@ -0,0 +1,39 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package mlog
// Standard levels
var (
LvlPanic = LogLevel{ID: 0, Name: "panic", Stacktrace: true}
LvlFatal = LogLevel{ID: 1, Name: "fatal", Stacktrace: true}
LvlError = LogLevel{ID: 2, Name: "error"}
LvlWarn = LogLevel{ID: 3, Name: "warn"}
LvlInfo = LogLevel{ID: 4, Name: "info"}
LvlDebug = LogLevel{ID: 5, Name: "debug"}
LvlTrace = LogLevel{ID: 6, Name: "trace"}
// used by redirected standard logger
LvlStdLog = LogLevel{ID: 10, Name: "stdlog"}
// used only by the logger
LvlLogError = LogLevel{ID: 11, Name: "logerror", Stacktrace: true}
)
// Register custom (discrete) levels here.
// !!!!! ID's must not exceed 32,768 !!!!!!
var (
// used by the audit system
LvlAuditAPI = LogLevel{ID: 100, Name: "audit-api"}
LvlAuditContent = LogLevel{ID: 101, Name: "audit-content"}
LvlAuditPerms = LogLevel{ID: 102, Name: "audit-permissions"}
LvlAuditCLI = LogLevel{ID: 103, Name: "audit-cli"}
// used by the TCP log target
LvlTcpLogTarget = LogLevel{ID: 120, Name: "TcpLogTarget"}
// add more here ...
)
// Combinations for LogM (log multi)
var (
MLvlAuditAll = []LogLevel{LvlAuditAPI, LvlAuditContent, LvlAuditPerms, LvlAuditCLI}
)

View File

@ -4,10 +4,15 @@
package mlog
import (
"context"
"fmt"
"io"
"log"
"os"
"sync/atomic"
"time"
"github.com/mattermost/logr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
@ -22,6 +27,19 @@ const (
LevelWarn = "warn"
// Errors are messages about things we know are problems
LevelError = "error"
// DefaultFlushTimeout is the default amount of time mlog.Flush will wait
// before timing out.
DefaultFlushTimeout = time.Second * 5
)
var (
// disableZap is set when Zap should be disabled and Logr used instead.
// This is needed for unit testing as Zap has no shutdown capabilities
// and holds file handles until process exit. Currently unit test create
// many server instances, and thus many Zap log files.
// This flag will be removed when Zap is permanently replaced.
disableZap int32
)
// Type and function aliases from zap to limit the libraries scope into MM code
@ -38,6 +56,8 @@ var NamedErr = zap.NamedError
var Bool = zap.Bool
var Duration = zap.Duration
type TargetInfo logr.TargetInfo
type LoggerConfiguration struct {
EnableConsole bool
ConsoleJson bool
@ -52,6 +72,7 @@ type Logger struct {
zap *zap.Logger
consoleLevel zap.AtomicLevel
fileLevel zap.AtomicLevel
logrLogger *logr.Logger
}
func getZapLevel(level string) zapcore.Level {
@ -84,6 +105,7 @@ func NewLogger(config *LoggerConfiguration) *Logger {
logger := &Logger{
consoleLevel: zap.NewAtomicLevelAt(getZapLevel(config.ConsoleLevel)),
fileLevel: zap.NewAtomicLevelAt(getZapLevel(config.FileLevel)),
logrLogger: newLogr(),
}
if config.EnableConsole {
@ -93,13 +115,33 @@ func NewLogger(config *LoggerConfiguration) *Logger {
}
if config.EnableFile {
writer := zapcore.AddSync(&lumberjack.Logger{
Filename: config.FileLocation,
MaxSize: 100,
Compress: true,
})
core := zapcore.NewCore(makeEncoder(config.FileJson), writer, logger.fileLevel)
cores = append(cores, core)
if atomic.LoadInt32(&disableZap) != 0 {
t := &LogTarget{
Type: "file",
Format: "json",
Levels: mlogLevelToLogrLevels(config.FileLevel),
MaxQueueSize: DefaultMaxTargetQueue,
Options: []byte(fmt.Sprintf(`{"Filename":"%s", "MaxSizeMB":%d, "Compress":%t}`,
config.FileLocation, 100, true)),
}
if !config.FileJson {
t.Format = "plain"
}
if tgt, err := NewLogrTarget("mlogFile", t); err == nil {
logger.logrLogger.Logr().AddTarget(tgt)
} else {
Error("error creating mlogFile", Err(err))
}
} else {
writer := zapcore.AddSync(&lumberjack.Logger{
Filename: config.FileLocation,
MaxSize: 100,
Compress: true,
})
core := zapcore.NewCore(makeEncoder(config.FileJson), writer, logger.fileLevel)
cores = append(cores, core)
}
}
combinedCore := zapcore.NewTee(cores...)
@ -107,7 +149,6 @@ func NewLogger(config *LoggerConfiguration) *Logger {
logger.zap = zap.New(combinedCore,
zap.AddCaller(),
)
return logger
}
@ -123,6 +164,10 @@ func (l *Logger) SetConsoleLevel(level string) {
func (l *Logger) With(fields ...Field) *Logger {
newlogger := *l
newlogger.zap = newlogger.zap.With(fields...)
if newlogger.logrLogger != nil {
ll := newlogger.logrLogger.WithFields(zapToLogr(fields))
newlogger.logrLogger = &ll
}
return &newlogger
}
@ -161,20 +206,120 @@ func (l *Logger) Sugar() *SugarLogger {
func (l *Logger) Debug(message string, fields ...Field) {
l.zap.Debug(message, fields...)
if isLevelEnabled(l.logrLogger, logr.Debug) {
l.logrLogger.WithFields(zapToLogr(fields)).Debug(message)
}
}
func (l *Logger) Info(message string, fields ...Field) {
l.zap.Info(message, fields...)
if isLevelEnabled(l.logrLogger, logr.Info) {
l.logrLogger.WithFields(zapToLogr(fields)).Info(message)
}
}
func (l *Logger) Warn(message string, fields ...Field) {
l.zap.Warn(message, fields...)
if isLevelEnabled(l.logrLogger, logr.Warn) {
l.logrLogger.WithFields(zapToLogr(fields)).Warn(message)
}
}
func (l *Logger) Error(message string, fields ...Field) {
l.zap.Error(message, fields...)
if isLevelEnabled(l.logrLogger, logr.Error) {
l.logrLogger.WithFields(zapToLogr(fields)).Error(message)
}
}
func (l *Logger) Critical(message string, fields ...Field) {
l.zap.Error(message, fields...)
if isLevelEnabled(l.logrLogger, logr.Error) {
l.logrLogger.WithFields(zapToLogr(fields)).Error(message)
}
}
func (l *Logger) Log(level LogLevel, message string, fields ...Field) {
l.logrLogger.WithFields(zapToLogr(fields)).Log(logr.Level(level), message)
}
func (l *Logger) LogM(levels []LogLevel, message string, fields ...Field) {
var logger *logr.Logger
for _, lvl := range levels {
if isLevelEnabled(l.logrLogger, logr.Level(lvl)) {
// don't create logger with fields unless at least one level is active.
if logger == nil {
l := l.logrLogger.WithFields(zapToLogr(fields))
logger = &l
}
logger.Log(logr.Level(lvl), message)
}
}
}
func (l *Logger) Flush(cxt context.Context) error {
return l.logrLogger.Logr().FlushWithTimeout(cxt)
}
// ShutdownAdvancedLogging stops the logger from accepting new log records and tries to
// flush queues within the context timeout. Once complete all targets are shutdown
// and any resources released.
func (l *Logger) ShutdownAdvancedLogging(cxt context.Context) error {
err := l.logrLogger.Logr().ShutdownWithTimeout(cxt)
l.logrLogger = newLogr()
return err
}
// ConfigAdvancedLoggingConfig (re)configures advanced logging based on the
// specified log targets. This is the easiest way to get the advanced logger
// configured via a config source such as file.
func (l *Logger) ConfigAdvancedLogging(targets LogTargetCfg) error {
if err := l.ShutdownAdvancedLogging(context.Background()); err != nil {
Error("error shutting down previous logger", Err(err))
}
err := logrAddTargets(l.logrLogger, targets)
return err
}
// AddTarget adds one or more logr.Target to the advanced logger. This is the preferred method
// to add custom targets or provide configuration that cannot be expressed via a
// config source.
func (l *Logger) AddTarget(targets ...logr.Target) error {
return l.logrLogger.Logr().AddTarget(targets...)
}
// RemoveTargets selectively removes targets that were previously added to this logger instance
// using the passed in filter function. The filter function should return true to remove the target
// and false to keep it.
func (l *Logger) RemoveTargets(ctx context.Context, f func(ti TargetInfo) bool) error {
// Use locally defined TargetInfo type so we don't spread Logr dependencies.
fc := func(tic logr.TargetInfo) bool {
return f(TargetInfo(tic))
}
return l.logrLogger.Logr().RemoveTargets(ctx, fc)
}
// EnableMetrics enables metrics collection by supplying a MetricsCollector.
// The MetricsCollector provides counters and gauges that are updated by log targets.
func (l *Logger) EnableMetrics(collector logr.MetricsCollector) error {
return l.logrLogger.Logr().SetMetricsCollector(collector)
}
// DisableZap is called to disable Zap, and Logr will be used instead. Any Logger
// instances created after this call will only use Logr.
//
// This is needed for unit testing as Zap has no shutdown capabilities
// and holds file handles until process exit. Currently unit tests create
// many server instances, and thus many Zap log file handles.
//
// This method will be removed when Zap is permanently replaced.
func DisableZap() {
atomic.StoreInt32(&disableZap, 1)
}
// EnableZap re-enables Zap such that any Logger instances created after this
// call will allow Zap targets.
func EnableZap() {
atomic.StoreInt32(&disableZap, 0)
}

View File

@ -0,0 +1,247 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package mlog
import (
"encoding/json"
"fmt"
"io"
"os"
"github.com/hashicorp/go-multierror"
"github.com/mattermost/logr"
logrFmt "github.com/mattermost/logr/format"
"github.com/mattermost/logr/target"
"go.uber.org/zap/zapcore"
)
const (
DefaultMaxTargetQueue = 1000
DefaultSysLogPort = 514
)
type LogLevel struct {
ID logr.LevelID
Name string
Stacktrace bool
}
type LogTarget struct {
Type string // one of "console", "file", "tcp", "syslog", "none".
Format string // one of "json", "plain"
Levels []LogLevel
Options json.RawMessage
MaxQueueSize int
}
type LogTargetCfg map[string]*LogTarget
type LogrCleanup func() error
func newLogr() *logr.Logger {
lgr := &logr.Logr{}
lgr.OnExit = func(int) {}
lgr.OnPanic = func(interface{}) {}
lgr.OnLoggerError = onLoggerError
lgr.OnQueueFull = onQueueFull
lgr.OnTargetQueueFull = onTargetQueueFull
logger := lgr.NewLogger()
return &logger
}
func logrAddTargets(logger *logr.Logger, targets LogTargetCfg) error {
lgr := logger.Logr()
var errs error
for name, t := range targets {
target, err := NewLogrTarget(name, t)
if err != nil {
errs = multierror.Append(err)
continue
}
if target != nil {
target.SetName(name)
lgr.AddTarget(target)
}
}
return errs
}
// NewLogrTarget creates a `logr.Target` based on a target config.
// Can be used when parsing custom config files, or when programmatically adding
// built-in targets. Use `mlog.AddTarget` to add custom targets.
func NewLogrTarget(name string, t *LogTarget) (logr.Target, error) {
formatter, err := newFormatter(name, t.Format)
if err != nil {
return nil, err
}
filter, err := newFilter(name, t.Levels)
if err != nil {
return nil, err
}
if t.MaxQueueSize == 0 {
t.MaxQueueSize = DefaultMaxTargetQueue
}
switch t.Type {
case "console":
return newConsoleTarget(name, t, filter, formatter)
case "file":
return newFileTarget(name, t, filter, formatter)
case "syslog":
return newSyslogTarget(name, t, filter, formatter)
case "tcp":
return newTCPTarget(name, t, filter, formatter)
case "none":
return nil, nil
}
return nil, fmt.Errorf("invalid type '%s' for target %s", t.Type, name)
}
func newFilter(name string, levels []LogLevel) (logr.Filter, error) {
filter := &logr.CustomFilter{}
for _, lvl := range levels {
filter.Add(logr.Level(lvl))
}
return filter, nil
}
func newFormatter(name string, format string) (logr.Formatter, error) {
switch format {
case "json", "":
return &logrFmt.JSON{}, nil
case "plain":
return &logrFmt.Plain{Delim: " | "}, nil
default:
return nil, fmt.Errorf("invalid format '%s' for target %s", format, name)
}
}
func newConsoleTarget(name string, t *LogTarget, filter logr.Filter, formatter logr.Formatter) (logr.Target, error) {
type consoleOptions struct {
Out string `json:"Out"`
}
options := &consoleOptions{}
if err := json.Unmarshal(t.Options, options); err != nil {
return nil, err
}
var w io.Writer
switch options.Out {
case "stdout", "":
w = os.Stdout
case "stderr":
w = os.Stderr
default:
return nil, fmt.Errorf("invalid out '%s' for target %s", options.Out, name)
}
newTarget := target.NewWriterTarget(filter, formatter, w, t.MaxQueueSize)
return newTarget, nil
}
func newFileTarget(name string, t *LogTarget, filter logr.Filter, formatter logr.Formatter) (logr.Target, error) {
type fileOptions struct {
Filename string `json:"Filename"`
MaxSize int `json:"MaxSizeMB"`
MaxAge int `json:"MaxAgeDays"`
MaxBackups int `json:"MaxBackups"`
Compress bool `json:"Compress"`
}
options := &fileOptions{}
if err := json.Unmarshal(t.Options, options); err != nil {
return nil, err
}
return newFileTargetWithOpts(name, t, target.FileOptions(*options), filter, formatter)
}
func newFileTargetWithOpts(name string, t *LogTarget, opts target.FileOptions, filter logr.Filter, formatter logr.Formatter) (logr.Target, error) {
if opts.Filename == "" {
return nil, fmt.Errorf("missing 'Filename' option for target %s", name)
}
if err := checkFileWritable(opts.Filename); err != nil {
return nil, fmt.Errorf("error writing to 'Filename' for target %s: %w", name, err)
}
newTarget := target.NewFileTarget(filter, formatter, opts, t.MaxQueueSize)
return newTarget, nil
}
func newSyslogTarget(name string, t *LogTarget, filter logr.Filter, formatter logr.Formatter) (logr.Target, error) {
options := &SyslogParams{}
if err := json.Unmarshal(t.Options, options); err != nil {
return nil, err
}
if options.IP == "" {
return nil, fmt.Errorf("missing 'IP' option for target %s", name)
}
if options.Port == 0 {
options.Port = DefaultSysLogPort
}
return NewSyslogTarget(filter, formatter, options, t.MaxQueueSize)
}
func newTCPTarget(name string, t *LogTarget, filter logr.Filter, formatter logr.Formatter) (logr.Target, error) {
options := &TcpParams{}
if err := json.Unmarshal(t.Options, options); err != nil {
return nil, err
}
if options.IP == "" {
return nil, fmt.Errorf("missing 'IP' option for target %s", name)
}
if options.Port == 0 {
return nil, fmt.Errorf("missing 'Port' option for target %s", name)
}
return NewTcpTarget(filter, formatter, options, t.MaxQueueSize)
}
func checkFileWritable(filename string) error {
// try opening/creating the file for writing
file, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
return err
}
file.Close()
return nil
}
func isLevelEnabled(logger *logr.Logger, level logr.Level) bool {
if logger == nil || logger.Logr() == nil {
return false
}
status := logger.Logr().IsLevelEnabled(level)
return status.Enabled
}
// zapToLogr converts Zap fields to Logr fields.
// This will not be needed once Logr is used for all logging.
func zapToLogr(zapFields []Field) logr.Fields {
encoder := zapcore.NewMapObjectEncoder()
for _, zapField := range zapFields {
zapField.AddTo(encoder)
}
return logr.Fields(encoder.Fields)
}
// mlogLevelToLogrLevel converts a mlog logger level to
// an array of discrete Logr levels.
func mlogLevelToLogrLevels(level string) []LogLevel {
levels := make([]LogLevel, 0)
levels = append(levels, LvlError, LvlPanic, LvlFatal, LvlStdLog)
switch level {
case LevelDebug:
levels = append(levels, LvlDebug)
fallthrough
case LevelInfo:
levels = append(levels, LvlInfo)
fallthrough
case LevelWarn:
levels = append(levels, LvlWarn)
}
return levels
}

View File

@ -0,0 +1,142 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package mlog
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"errors"
"fmt"
"io/ioutil"
"github.com/mattermost/logr"
"github.com/wiggin77/merror"
syslog "github.com/wiggin77/srslog"
)
// Syslog outputs log records to local or remote syslog.
type Syslog struct {
logr.Basic
w *syslog.Writer
}
// SyslogParams provides parameters for dialing a syslog daemon.
type SyslogParams struct {
IP string `json:"IP"`
Port int `json:"Port"`
Tag string `json:"Tag"`
TLS bool `json:"TLS"`
Cert string `json:"Cert"`
Insecure bool `json:"Insecure"`
}
// NewSyslogTarget creates a target capable of outputting log records to remote or local syslog, with or without TLS.
func NewSyslogTarget(filter logr.Filter, formatter logr.Formatter, params *SyslogParams, maxQueue int) (*Syslog, error) {
network := "tcp"
var config *tls.Config
if params.TLS {
network = "tcp+tls"
config = &tls.Config{InsecureSkipVerify: params.Insecure}
if params.Cert != "" {
pool, err := getCertPool(params.Cert)
if err != nil {
return nil, err
}
config.RootCAs = pool
}
}
raddr := fmt.Sprintf("%s:%d", params.IP, params.Port)
writer, err := syslog.DialWithTLSConfig(network, raddr, syslog.LOG_INFO, params.Tag, config)
if err != nil {
return nil, err
}
s := &Syslog{w: writer}
s.Basic.Start(s, s, filter, formatter, maxQueue)
return s, nil
}
// Shutdown stops processing log records after making best effort to flush queue.
func (s *Syslog) Shutdown(ctx context.Context) error {
errs := merror.New()
err := s.Basic.Shutdown(ctx)
errs.Append(err)
err = s.w.Close()
errs.Append(err)
return errs.ErrorOrNil()
}
// getCertPool returns a x509.CertPool containing the cert(s)
// from `cert`, which can be a path to a .pem or .crt file,
// or a base64 encoded cert.
func getCertPool(cert string) (*x509.CertPool, error) {
if cert == "" {
return nil, errors.New("no cert provided")
}
// first treat as a file and try to read.
serverCert, err := ioutil.ReadFile(cert)
if err != nil {
// maybe it's a base64 encoded cert
serverCert, err = base64.StdEncoding.DecodeString(cert)
if err != nil {
return nil, errors.New("cert cannot be read")
}
}
pool := x509.NewCertPool()
if ok := pool.AppendCertsFromPEM(serverCert); ok {
return pool, nil
}
return nil, errors.New("cannot parse cert")
}
// Write converts the log record to bytes, via the Formatter,
// and outputs to syslog.
func (s *Syslog) Write(rec *logr.LogRec) error {
_, stacktrace := s.IsLevelEnabled(rec.Level())
buf := rec.Logger().Logr().BorrowBuffer()
defer rec.Logger().Logr().ReleaseBuffer(buf)
buf, err := s.Formatter().Format(rec, stacktrace, buf)
if err != nil {
return err
}
txt := buf.String()
switch rec.Level() {
case logr.Panic, logr.Fatal:
err = s.w.Crit(txt)
case logr.Error:
err = s.w.Err(txt)
case logr.Warn:
err = s.w.Warning(txt)
case logr.Debug, logr.Trace:
err = s.w.Debug(txt)
default:
// logr.Info plus all custom levels.
err = s.w.Info(txt)
}
if err != nil {
reporter := rec.Logger().Logr().ReportError
reporter(fmt.Errorf("syslog write fail: %w", err))
// syslog writer will try to reconnect.
}
return err
}
// String returns a string representation of this target.
func (s *Syslog) String() string {
return "SyslogTarget"
}

View File

@ -0,0 +1,274 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package mlog
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"github.com/mattermost/logr"
_ "net/http/pprof"
)
const (
DialTimeoutSecs = 30
WriteTimeoutSecs = 30
RetryBackoffMillis int64 = 100
MaxRetryBackoffMillis int64 = 30 * 1000 // 30 seconds
)
// Tcp outputs log records to raw socket server.
type Tcp struct {
logr.Basic
params *TcpParams
addy string
mutex sync.Mutex
conn net.Conn
monitor chan struct{}
shutdown chan struct{}
}
// TcpParams provides parameters for dialing a socket server.
type TcpParams struct {
IP string `json:"IP"`
Port int `json:"Port"`
TLS bool `json:"TLS"`
Cert string `json:"Cert"`
Insecure bool `json:"Insecure"`
}
// NewTcpTarget creates a target capable of outputting log records to a raw socket, with or without TLS.
func NewTcpTarget(filter logr.Filter, formatter logr.Formatter, params *TcpParams, maxQueue int) (*Tcp, error) {
tcp := &Tcp{
params: params,
addy: fmt.Sprintf("%s:%d", params.IP, params.Port),
monitor: make(chan struct{}),
shutdown: make(chan struct{}),
}
tcp.Basic.Start(tcp, tcp, filter, formatter, maxQueue)
return tcp, nil
}
// getConn provides a net.Conn. If a connection already exists, it is returned immediately,
// otherwise this method blocks until a new connection is created, timeout or shutdown.
func (tcp *Tcp) getConn() (net.Conn, error) {
tcp.mutex.Lock()
defer tcp.mutex.Unlock()
Log(LvlTcpLogTarget, "getConn enter", String("addy", tcp.addy))
defer Log(LvlTcpLogTarget, "getConn exit", String("addy", tcp.addy))
if tcp.conn != nil {
Log(LvlTcpLogTarget, "reusing existing conn", String("addy", tcp.addy)) // use "With" once Zap is removed
return tcp.conn, nil
}
type result struct {
conn net.Conn
err error
}
connChan := make(chan result)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*DialTimeoutSecs)
defer cancel()
go func(ctx context.Context, ch chan result) {
Log(LvlTcpLogTarget, "dailing", String("addy", tcp.addy))
conn, err := tcp.dial(ctx)
if err == nil {
tcp.conn = conn
tcp.monitor = make(chan struct{})
go monitor(tcp.conn, tcp.monitor)
}
connChan <- result{conn: conn, err: err}
}(ctx, connChan)
select {
case <-tcp.shutdown:
return nil, errors.New("shutdown")
case res := <-connChan:
return res.conn, res.err
}
}
// dial connects to a TCP socket, and optionally performs a TLS handshake.
// A non-nil context must be provided which can cancel the dial.
func (tcp *Tcp) dial(ctx context.Context) (net.Conn, error) {
var dialer net.Dialer
dialer.Timeout = time.Second * DialTimeoutSecs
conn, err := dialer.DialContext(ctx, "tcp", fmt.Sprintf("%s:%d", tcp.params.IP, tcp.params.Port))
if err != nil {
return nil, err
}
if !tcp.params.TLS {
return conn, nil
}
Log(LvlTcpLogTarget, "TLS handshake", String("addy", tcp.addy))
tlsconfig := &tls.Config{
ServerName: tcp.params.IP,
InsecureSkipVerify: tcp.params.Insecure,
}
if tcp.params.Cert != "" {
pool, err := getCertPool(tcp.params.Cert)
if err != nil {
return nil, err
}
tlsconfig.RootCAs = pool
}
tlsConn := tls.Client(conn, tlsconfig)
if err := tlsConn.Handshake(); err != nil {
return nil, err
}
return tlsConn, nil
}
func (tcp *Tcp) close() error {
tcp.mutex.Lock()
defer tcp.mutex.Unlock()
var err error
if tcp.conn != nil {
Log(LvlTcpLogTarget, "closing connection", String("addy", tcp.addy))
close(tcp.monitor)
err = tcp.conn.Close()
tcp.conn = nil
}
return err
}
// Shutdown stops processing log records after making best effort to flush queue.
func (tcp *Tcp) Shutdown(ctx context.Context) error {
errs := &multierror.Error{}
Log(LvlTcpLogTarget, "shutting down", String("addy", tcp.addy))
if err := tcp.Basic.Shutdown(ctx); err != nil {
errs = multierror.Append(errs, err)
}
if err := tcp.close(); err != nil {
errs = multierror.Append(errs, err)
}
close(tcp.shutdown)
return errs.ErrorOrNil()
}
// Write converts the log record to bytes, via the Formatter, and outputs to the socket.
// Called by dedicated target goroutine and will block until success or shutdown.
func (tcp *Tcp) Write(rec *logr.LogRec) error {
_, stacktrace := tcp.IsLevelEnabled(rec.Level())
buf := rec.Logger().Logr().BorrowBuffer()
defer rec.Logger().Logr().ReleaseBuffer(buf)
buf, err := tcp.Formatter().Format(rec, stacktrace, buf)
if err != nil {
return err
}
try := 1
backoff := RetryBackoffMillis
for {
select {
case <-tcp.shutdown:
return err
default:
}
conn, err := tcp.getConn()
if err != nil {
Log(LvlTcpLogTarget, "failed getting connection", String("addy", tcp.addy), Err(err))
reporter := rec.Logger().Logr().ReportError
reporter(fmt.Errorf("log target %s connection error: %w", tcp.String(), err))
backoff = tcp.sleep(backoff)
continue
}
conn.SetWriteDeadline(time.Now().Add(time.Second * WriteTimeoutSecs))
_, err = buf.WriteTo(conn)
if err == nil {
return nil
}
Log(LvlTcpLogTarget, "write error", String("addy", tcp.addy), Err(err))
reporter := rec.Logger().Logr().ReportError
reporter(fmt.Errorf("log target %s write error: %w", tcp.String(), err))
_ = tcp.close()
backoff = tcp.sleep(backoff)
try++
Log(LvlTcpLogTarget, "retrying write", String("addy", tcp.addy), Int("try", try))
}
}
// monitor continuously tries to read from the connection to detect socket close.
// This is needed because TCP target uses a write only socket and Linux systems
// take a long time to detect a loss of connectivity on a socket when only writing;
// the writes simply fail without an error returned.
func monitor(conn net.Conn, done <-chan struct{}) {
addy := conn.RemoteAddr().String()
defer Log(LvlTcpLogTarget, "monitor exiting", String("addy", addy))
buf := make([]byte, 1)
for {
Log(LvlTcpLogTarget, "monitor loop", String("addy", addy))
select {
case <-done:
return
case <-time.After(1 * time.Second):
}
err := conn.SetReadDeadline(time.Now().Add(time.Second * 30))
if err != nil {
continue
}
_, err = conn.Read(buf)
if errt, ok := err.(net.Error); ok && errt.Timeout() {
// read timeout is expected, keep looping.
continue
}
// Any other error closes the connection, forcing a reconnect.
Log(LvlTcpLogTarget, "monitor closing connection", Err(err))
conn.Close()
return
}
}
// String returns a string representation of this target.
func (tcp *Tcp) String() string {
return fmt.Sprintf("TcpTarget[%s:%d]", tcp.params.IP, tcp.params.Port)
}
func (tcp *Tcp) sleep(backoff int64) int64 {
select {
case <-tcp.shutdown:
case <-time.After(time.Millisecond * time.Duration(backoff)):
}
nextBackoff := backoff + (backoff >> 1)
if nextBackoff > MaxRetryBackoffMillis {
nextBackoff = MaxRetryBackoffMillis
}
return nextBackoff
}

View File

@ -0,0 +1,43 @@
-----BEGIN CERTIFICATE-----
MIIDjzCCAnegAwIBAgIRAPYfRSwdzKopBKxYxKqslJUwDQYJKoZIhvcNAQELBQAw
JzElMCMGA1UEAwwcTWF0dGVybW9zdCwgSW5jLiBJbnRlcm5hbCBDQTAeFw0xOTAz
MjIwMDE0MTVaFw0yMjAzMDYwMDE0MTVaMDsxOTA3BgNVBAMTME1hdHRlcm1vc3Qs
IEluYy4gSW50ZXJuYWwgSW50ZXJtZWRpYXRlIEF1dGhvcml0eTCCASIwDQYJKoZI
hvcNAQEBBQADggEPADCCAQoCggEBAMjliRdmvnNL4u/Jr/M2dPwQmTJXEBY/Vq9Q
vAU52X3tRMCPxcaFz+x6ftuvdO2NdohXGAmtx9QU5LZcvFeTDpoVEBo9A+4jtLvD
DZYaTNLpJmoSoJHaDbdWX+OAOqyDiWS741LuiMKWHhew9QOisat2ZINPxjmAd9wE
xthTMgzsv7MUqnMer8U5OGQ0Qy7wAmNRc+2K3qPwkxe2RUvcte50DUFNgxEginsh
vrkOXR383vUCZfu72qu8oggjiQpyTllu5je2Ap6JLjYLkEMiMqrYADuWor/ZHwa6
WrFqVETxWfAV5u9Eh0wZM/KKYwRQuw9y+Nans77FmUl1tVWWNN8CAwEAAaOBoTCB
njAMBgNVHRMEBTADAQH/MB0GA1UdDgQWBBQY4Uqswyr2hO/HetZt2RDxJdTIPjBi
BgNVHSMEWzBZgBRFZXVg2Z5tNIsWeWjBLEy2yzKbMKErpCkwJzElMCMGA1UEAwwc
TWF0dGVybW9zdCwgSW5jLiBJbnRlcm5hbCBDQYIUEifGUOM+bIFZo1tkjZB5YGBr
0xEwCwYDVR0PBAQDAgEGMA0GCSqGSIb3DQEBCwUAA4IBAQAEdexL30Q0zBHmPAH8
LhdK7dbzW1CmILbxRZlKAwRN+hKRXiMW3MHIkhNuoV9Aev602Q+ja4lWsRi/ktOL
ni1FWx5gSScgdG8JGj47dOmoT3vXKX7+umiv4rQLPDl9/DKMuv204OYJq6VT+uNU
6C6kL157jGJEO76H4fMZ8oYsD7Sq0zjiNKtuCYii0ngH3j3gB1jACLqRgveU7MdT
pqOV2KfY31+h8VBtkUvljNztQ9xNY8Fjmt0SMf7E3FaUcaar3ZCr70G5aU3dKbe7
47vGOBa5tCqw4YK0jgDKid3IJQul9a3J1mSsH8Wy3to9cAV4KGZBQLnzCX15a/+v
3yVh
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIDfjCCAmagAwIBAgIUEifGUOM+bIFZo1tkjZB5YGBr0xEwDQYJKoZIhvcNAQEL
BQAwJzElMCMGA1UEAwwcTWF0dGVybW9zdCwgSW5jLiBJbnRlcm5hbCBDQTAeFw0x
OTAzMjEyMTI4NDNaFw0yOTAzMTgyMTI4NDNaMCcxJTAjBgNVBAMMHE1hdHRlcm1v
c3QsIEluYy4gSW50ZXJuYWwgQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
AoIBAQDH0Xq5rMBGpKOVWTpb5MnaJIWFP/vOtvEk+7hVrfOfe1/5x0Kk3UgAHj85
otaEZD1Lhn/JLkEqCiE/UXMJFwJDlNcO4CkdKBSpYX4bKAqy5q/X3QwioMSNpJG1
+YYrNGBH0sgKcKjyCaLhmqYLD0xZDVOmWIYBU9jUPyXw5U0tnsVrTqGMxVkm1xCY
krCWN1ZoUrLvL0MCZc5qpxoPTopr9UO9cqSBSuy6BVWVuEWBZhpqHt+ul8VxhzzY
q1k4l7r2qw+/wm1iJBedTeBVeWNag8JaVfLgu+/W7oJVlPO32Po7pnvHp8iJ3b4K
zXyVHaTX4S6Em+6LV8855TYrShzlAgMBAAGjgaEwgZ4wHQYDVR0OBBYEFEVldWDZ
nm00ixZ5aMEsTLbLMpswMGIGA1UdIwRbMFmAFEVldWDZnm00ixZ5aMEsTLbLMpsw
oSukKTAnMSUwIwYDVQQDDBxNYXR0ZXJtb3N0LCBJbmMuIEludGVybmFsIENBghQS
J8ZQ4z5sgVmjW2SNkHlgYGvTETAMBgNVHRMEBTADAQH/MAsGA1UdDwQEAwIBBjAN
BgkqhkiG9w0BAQsFAAOCAQEAPiCWFmopyAkY2T3Zyo4yaRPhX1+VOTMKJtY6EUhq
/GHz6kzEyvCUBf0N892cibGxekrEoItY9NqO6RQRfowg+Gn5kc13z4NyL2W8/eoT
Xy0ZvfaQbU++fQ6pVtWtMblDMU9xiYd7/MDvJpO328l1Vhcdp8kEi+lCvpy0sCRc
PxzPhbgCMAbZEGx+4TMQd4SZKzlRxW/2fflpReh6v1Dv0VDUSYQWwsUnaLpdKHfh
a5k0vuySYcszE4YKlY0zakeFlJfp7fBp1xTwcdW8aTfw15EicPMwTc6xxA4JJUJx
cddu817n1nayK5u6r9Qh1oIVkr0nC9YELMMy4dpPgJ88SA==
-----END CERTIFICATE-----

View File

@ -32,9 +32,10 @@ func NewTestingLogger(tb testing.TB, writer io.Writer) *Logger {
testingLogger := &Logger{
consoleLevel: zap.NewAtomicLevelAt(getZapLevel("debug")),
fileLevel: zap.NewAtomicLevelAt(getZapLevel("info")),
logrLogger: newLogr(),
}
logWriterCore := zapcore.NewCore(makeEncoder(true), logWriterSync, testingLogger.consoleLevel)
logWriterCore := zapcore.NewCore(makeEncoder(true), zapcore.Lock(logWriterSync), testingLogger.consoleLevel)
testingLogger.zap = zap.New(logWriterCore,
zap.AddCaller(),