e1.5.0 Log sink options added.
This commit is contained in:
76
processor.go
76
processor.go
@ -12,21 +12,24 @@ const (
|
||||
// Threshold for triggering reactive disk check
|
||||
reactiveCheckThresholdBytes int64 = 10 * 1024 * 1024
|
||||
// Factors to adjust check interval
|
||||
adaptiveIntervalFactor float64 = 1.5 // Slow down factor
|
||||
adaptiveSpeedUpFactor float64 = 0.8 // Speed up factor
|
||||
adaptiveIntervalFactor float64 = 1.5 // Slow down
|
||||
adaptiveSpeedUpFactor float64 = 0.8 // Speed up
|
||||
)
|
||||
|
||||
// processLogs is the main log processing loop running in a separate goroutine
|
||||
func (l *Logger) processLogs(ch <-chan logRecord) {
|
||||
l.state.ProcessorExited.Store(false) // Mark processor as running
|
||||
defer l.state.ProcessorExited.Store(true) // Ensure flag is set on exit
|
||||
l.state.ProcessorExited.Store(false)
|
||||
defer l.state.ProcessorExited.Store(true)
|
||||
|
||||
// Set up timers and state variables
|
||||
timers := l.setupProcessingTimers()
|
||||
defer l.closeProcessingTimers(timers)
|
||||
|
||||
// Perform an initial disk check on startup
|
||||
l.performDiskCheck(true) // Force check and update status
|
||||
// Perform an initial disk check on startup (skip if file output is disabled)
|
||||
disableFile, _ := l.config.Bool("log.disable_file")
|
||||
if !disableFile {
|
||||
l.performDiskCheck(true)
|
||||
}
|
||||
|
||||
// Send initial heartbeats immediately instead of waiting for first tick
|
||||
heartbeatLevel, _ := l.config.Int64("log.heartbeat_level")
|
||||
@ -65,8 +68,8 @@ func (l *Logger) processLogs(ch <-chan logRecord) {
|
||||
|
||||
// Reactive Check Trigger
|
||||
if bytesSinceLastCheck > reactiveCheckThresholdBytes {
|
||||
if l.performDiskCheck(false) { // Check without forcing cleanup yet
|
||||
bytesSinceLastCheck = 0 // Reset if check OK
|
||||
if l.performDiskCheck(false) {
|
||||
bytesSinceLastCheck = 0
|
||||
logsSinceLastCheck = 0
|
||||
lastCheckTime = time.Now()
|
||||
}
|
||||
@ -78,9 +81,8 @@ func (l *Logger) processLogs(ch <-chan logRecord) {
|
||||
|
||||
case <-timers.diskCheckTicker.C:
|
||||
// Periodic disk check
|
||||
if l.performDiskCheck(true) { // Periodic check, force cleanup if needed
|
||||
if l.performDiskCheck(true) {
|
||||
l.adjustDiskCheckInterval(timers, lastCheckTime, logsSinceLastCheck)
|
||||
// Reset counters after successful periodic check
|
||||
bytesSinceLastCheck = 0
|
||||
logsSinceLastCheck = 0
|
||||
lastCheckTime = time.Now()
|
||||
@ -193,8 +195,6 @@ func (l *Logger) setupHeartbeatTimer(timers *TimerSet) <-chan time.Time {
|
||||
if intervalS <= 0 {
|
||||
intervalS = 60 // Default to 60 seconds
|
||||
}
|
||||
// Create a new ticker that's offset slightly to avoid skipping the first tick
|
||||
// by creating it and then waiting until exactly the next interval time
|
||||
timers.heartbeatTicker = time.NewTicker(time.Duration(intervalS) * time.Second)
|
||||
return timers.heartbeatTicker.C
|
||||
}
|
||||
@ -203,11 +203,14 @@ func (l *Logger) setupHeartbeatTimer(timers *TimerSet) <-chan time.Time {
|
||||
|
||||
// processLogRecord handles individual log records, returning bytes written
|
||||
func (l *Logger) processLogRecord(record logRecord) int64 {
|
||||
if !l.state.DiskStatusOK.Load() {
|
||||
// Check if the record should process this record
|
||||
disableFile, _ := l.config.Bool("log.disable_file")
|
||||
if !disableFile && !l.state.DiskStatusOK.Load() {
|
||||
l.state.DroppedLogs.Add(1)
|
||||
return 0
|
||||
}
|
||||
|
||||
// Serialize the log entry once
|
||||
format, _ := l.config.String("log.format")
|
||||
data := l.serializer.serialize(
|
||||
format,
|
||||
@ -219,6 +222,25 @@ func (l *Logger) processLogRecord(record logRecord) int64 {
|
||||
)
|
||||
dataLen := int64(len(data))
|
||||
|
||||
// Mirror to stdout if enabled
|
||||
enableStdout, _ := l.config.Bool("log.enable_stdout")
|
||||
if enableStdout {
|
||||
if s := l.state.StdoutWriter.Load(); s != nil {
|
||||
// Assert to concrete type: *sink
|
||||
if sinkWrapper, ok := s.(*sink); ok && sinkWrapper != nil {
|
||||
// Use the wrapped writer (sinkWrapper.w)
|
||||
_, _ = sinkWrapper.w.Write(data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Skip file operations if file output is disabled
|
||||
if disableFile {
|
||||
l.state.TotalLogsProcessed.Add(1)
|
||||
return dataLen // Return data length for adaptive interval calculations
|
||||
}
|
||||
|
||||
// File rotation check
|
||||
currentFileSize := l.state.CurrentSize.Load()
|
||||
estimatedSize := currentFileSize + dataLen
|
||||
|
||||
@ -229,6 +251,7 @@ func (l *Logger) processLogRecord(record logRecord) int64 {
|
||||
}
|
||||
}
|
||||
|
||||
// Write to file
|
||||
cfPtr := l.state.CurrentFile.Load()
|
||||
if currentLogFile, isFile := cfPtr.(*os.File); isFile && currentLogFile != nil {
|
||||
n, err := currentLogFile.Write(data)
|
||||
@ -349,7 +372,7 @@ func (l *Logger) handleHeartbeat() {
|
||||
func (l *Logger) logProcHeartbeat() {
|
||||
processed := l.state.TotalLogsProcessed.Load()
|
||||
dropped := l.state.DroppedLogs.Load()
|
||||
sequence := l.state.HeartbeatSequence.Add(1) // Increment and get sequence number
|
||||
sequence := l.state.HeartbeatSequence.Add(1)
|
||||
|
||||
startTimeVal := l.state.LoggerStartTime.Load()
|
||||
var uptimeHours float64 = 0
|
||||
@ -436,19 +459,34 @@ func (l *Logger) logSysHeartbeat() {
|
||||
l.writeHeartbeatRecord(LevelSys, sysArgs)
|
||||
}
|
||||
|
||||
// writeHeartbeatRecord handles the common logic for writing a heartbeat record
|
||||
// writeHeartbeatRecord handles common logic for writing a heartbeat record
|
||||
func (l *Logger) writeHeartbeatRecord(level int64, args []any) {
|
||||
if l.state.LoggerDisabled.Load() || l.state.ShutdownCalled.Load() {
|
||||
return
|
||||
}
|
||||
|
||||
if !l.state.DiskStatusOK.Load() {
|
||||
return
|
||||
}
|
||||
|
||||
// Serialize heartbeat data
|
||||
format, _ := l.config.String("log.format")
|
||||
hbData := l.serializer.serialize(format, FlagDefault|FlagShowLevel, time.Now(), level, "", args)
|
||||
|
||||
// Mirror to stdout if enabled
|
||||
enableStdout, _ := l.config.Bool("log.enable_stdout")
|
||||
if enableStdout {
|
||||
if s := l.state.StdoutWriter.Load(); s != nil {
|
||||
// Assert to concrete type: *sink
|
||||
if sinkWrapper, ok := s.(*sink); ok && sinkWrapper != nil {
|
||||
// Use the wrapped writer (sinkWrapper.w)
|
||||
_, _ = sinkWrapper.w.Write(hbData)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
disableFile, _ := l.config.Bool("log.disable_file")
|
||||
if disableFile || !l.state.DiskStatusOK.Load() {
|
||||
return
|
||||
}
|
||||
|
||||
// Write to file
|
||||
cfPtr := l.state.CurrentFile.Load()
|
||||
if cfPtr == nil {
|
||||
fmtFprintf(os.Stderr, "log: error - current file handle is nil during heartbeat\n")
|
||||
|
||||
Reference in New Issue
Block a user