// FILE: processor.go package log import ( "fmt" "os" "runtime" "time" ) const ( // Threshold for triggering reactive disk check reactiveCheckThresholdBytes int64 = 10 * 1024 * 1024 // Factors to adjust check interval adaptiveIntervalFactor float64 = 1.5 // Slow down adaptiveSpeedUpFactor float64 = 0.8 // Speed up // Minimum wait time used throughout the package minWaitTime = 10 * time.Millisecond ) // processLogs is the main log processing loop running in a separate goroutine func (l *Logger) processLogs(ch <-chan logRecord) { l.state.ProcessorExited.Store(false) defer l.state.ProcessorExited.Store(true) // Set up timers and state variables timers := l.setupProcessingTimers() defer l.closeProcessingTimers(timers) c := l.getConfig() // Perform an initial disk check on startup (skip if file output is disabled) if !c.DisableFile { l.performDiskCheck(true) } // Send initial heartbeats immediately instead of waiting for first tick heartbeatLevel := c.HeartbeatLevel if heartbeatLevel > 0 { if heartbeatLevel >= 1 { l.logProcHeartbeat() } if heartbeatLevel >= 2 { l.logDiskHeartbeat() } if heartbeatLevel >= 3 { l.logSysHeartbeat() } } // State variables for adaptive disk checks var bytesSinceLastCheck int64 = 0 var lastCheckTime = time.Now() var logsSinceLastCheck int64 = 0 // --- Main Loop --- for { select { case record, ok := <-ch: if !ok { l.performSync() return } // Process the received log record bytesWritten := l.processLogRecord(record) if bytesWritten > 0 { // Update adaptive check counters bytesSinceLastCheck += bytesWritten logsSinceLastCheck++ // Reactive Check Trigger if bytesSinceLastCheck > reactiveCheckThresholdBytes { if l.performDiskCheck(false) { bytesSinceLastCheck = 0 logsSinceLastCheck = 0 lastCheckTime = time.Now() } } } case <-timers.flushTicker.C: l.handleFlushTick() case <-timers.diskCheckTicker.C: // Periodic disk check if l.performDiskCheck(true) { l.adjustDiskCheckInterval(timers, lastCheckTime, logsSinceLastCheck) bytesSinceLastCheck = 0 logsSinceLastCheck = 0 lastCheckTime = time.Now() } case confirmChan := <-l.state.flushRequestChan: l.handleFlushRequest(confirmChan) case <-timers.retentionChan: l.handleRetentionCheck() case <-timers.heartbeatChan: l.handleHeartbeat() } } } // TimerSet holds all timers used in processLogs type TimerSet struct { flushTicker *time.Ticker diskCheckTicker *time.Ticker retentionTicker *time.Ticker heartbeatTicker *time.Ticker retentionChan <-chan time.Time heartbeatChan <-chan time.Time } // setupProcessingTimers creates and configures all necessary timers for the processor func (l *Logger) setupProcessingTimers() *TimerSet { timers := &TimerSet{} c := l.getConfig() // Set up flush timer flushInterval := c.FlushIntervalMs if flushInterval <= 0 { flushInterval = DefaultConfig().FlushIntervalMs } timers.flushTicker = time.NewTicker(time.Duration(flushInterval) * time.Millisecond) // Set up retention timer if enabled timers.retentionChan = l.setupRetentionTimer(timers) // Set up disk check timer timers.diskCheckTicker = l.setupDiskCheckTimer() // Set up heartbeat timer timers.heartbeatChan = l.setupHeartbeatTimer(timers) return timers } // closeProcessingTimers stops all active timers func (l *Logger) closeProcessingTimers(timers *TimerSet) { timers.flushTicker.Stop() if timers.diskCheckTicker != nil { timers.diskCheckTicker.Stop() } if timers.retentionTicker != nil { timers.retentionTicker.Stop() } if timers.heartbeatTicker != nil { timers.heartbeatTicker.Stop() } } // setupRetentionTimer configures the retention check timer if retention is enabled func (l *Logger) setupRetentionTimer(timers *TimerSet) <-chan time.Time { c := l.getConfig() retentionPeriodHrs := c.RetentionPeriodHrs retentionCheckMins := c.RetentionCheckMins retentionDur := time.Duration(retentionPeriodHrs * float64(time.Hour)) retentionCheckInterval := time.Duration(retentionCheckMins * float64(time.Minute)) if retentionDur > 0 && retentionCheckInterval > 0 { timers.retentionTicker = time.NewTicker(retentionCheckInterval) l.updateEarliestFileTime() // Initial check return timers.retentionTicker.C } return nil } // setupDiskCheckTimer configures the disk check timer func (l *Logger) setupDiskCheckTimer() *time.Ticker { c := l.getConfig() diskCheckIntervalMs := c.DiskCheckIntervalMs if diskCheckIntervalMs <= 0 { diskCheckIntervalMs = 5000 } currentDiskCheckInterval := time.Duration(diskCheckIntervalMs) * time.Millisecond // Ensure initial interval respects bounds minCheckIntervalMs := c.MinCheckIntervalMs maxCheckIntervalMs := c.MaxCheckIntervalMs minCheckInterval := time.Duration(minCheckIntervalMs) * time.Millisecond maxCheckInterval := time.Duration(maxCheckIntervalMs) * time.Millisecond if currentDiskCheckInterval < minCheckInterval { currentDiskCheckInterval = minCheckInterval } if currentDiskCheckInterval > maxCheckInterval { currentDiskCheckInterval = maxCheckInterval } return time.NewTicker(currentDiskCheckInterval) } // setupHeartbeatTimer configures the heartbeat timer if heartbeats are enabled func (l *Logger) setupHeartbeatTimer(timers *TimerSet) <-chan time.Time { c := l.getConfig() heartbeatLevel := c.HeartbeatLevel if heartbeatLevel > 0 { intervalS := c.HeartbeatIntervalS // Make sure interval is positive if intervalS <= 0 { intervalS = DefaultConfig().HeartbeatIntervalS } timers.heartbeatTicker = time.NewTicker(time.Duration(intervalS) * time.Second) return timers.heartbeatTicker.C } return nil } // processLogRecord handles individual log records, returning bytes written func (l *Logger) processLogRecord(record logRecord) int64 { c := l.getConfig() // Check if the record should process this record disableFile := c.DisableFile if !disableFile && !l.state.DiskStatusOK.Load() { l.state.DroppedLogs.Add(1) return 0 } // Serialize the log entry once format := c.Format data := l.serializer.serialize( format, record.Flags, record.TimeStamp, record.Level, record.Trace, record.Args, ) dataLen := int64(len(data)) // Mirror to stdout if enabled enableStdout := c.EnableStdout if enableStdout { if s := l.state.StdoutWriter.Load(); s != nil { if sinkWrapper, ok := s.(*sink); ok && sinkWrapper != nil { // Handle split mode if c.StdoutTarget == "split" { if record.Level >= LevelWarn { // Write WARN and ERROR to stderr _, _ = os.Stderr.Write(data) } else { // Write INFO and DEBUG to stdout _, _ = sinkWrapper.w.Write(data) } } else { // Write to the configured target (stdout or stderr) _, _ = sinkWrapper.w.Write(data) } } } } // Skip file operations if file output is disabled if disableFile { l.state.TotalLogsProcessed.Add(1) return dataLen // Return data length for adaptive interval calculations } // File rotation check currentFileSize := l.state.CurrentSize.Load() estimatedSize := currentFileSize + dataLen maxSizeMB := c.MaxSizeMB if maxSizeMB > 0 && estimatedSize > maxSizeMB*1024*1024 { if err := l.rotateLogFile(); err != nil { l.internalLog("failed to rotate log file: %v\n", err) // Account for the dropped log that triggered the failed rotation l.state.DroppedLogs.Add(1) return 0 } } // Write to file cfPtr := l.state.CurrentFile.Load() if currentLogFile, isFile := cfPtr.(*os.File); isFile && currentLogFile != nil { n, err := currentLogFile.Write(data) if err != nil { l.internalLog("failed to write to log file: %v\n", err) l.state.DroppedLogs.Add(1) l.performDiskCheck(true) return 0 } else { l.state.CurrentSize.Add(int64(n)) l.state.TotalLogsProcessed.Add(1) return int64(n) } } else { l.state.DroppedLogs.Add(1) return 0 } } // handleFlushTick handles the periodic flush timer tick func (l *Logger) handleFlushTick() { c := l.getConfig() enableSync := c.EnablePeriodicSync if enableSync { l.performSync() } } // handleFlushRequest handles an explicit flush request func (l *Logger) handleFlushRequest(confirmChan chan struct{}) { l.performSync() close(confirmChan) } // handleRetentionCheck performs file retention check and cleanup func (l *Logger) handleRetentionCheck() { c := l.getConfig() retentionPeriodHrs := c.RetentionPeriodHrs retentionDur := time.Duration(retentionPeriodHrs * float64(time.Hour)) if retentionDur > 0 { etPtr := l.state.EarliestFileTime.Load() if earliest, ok := etPtr.(time.Time); ok && !earliest.IsZero() { if time.Since(earliest) > retentionDur { if err := l.cleanExpiredLogs(earliest); err == nil { l.updateEarliestFileTime() } else { l.internalLog("failed to clean expired logs: %v\n", err) } } } else if !ok || earliest.IsZero() { l.updateEarliestFileTime() } } } // adjustDiskCheckInterval modifies the disk check interval based on logging activity func (l *Logger) adjustDiskCheckInterval(timers *TimerSet, lastCheckTime time.Time, logsSinceLastCheck int64) { c := l.getConfig() enableAdaptive := c.EnableAdaptiveInterval if !enableAdaptive { return } elapsed := time.Since(lastCheckTime) if elapsed < minWaitTime { // Min arbitrary reasonable value elapsed = minWaitTime } logsPerSecond := float64(logsSinceLastCheck) / elapsed.Seconds() targetLogsPerSecond := float64(100) // Baseline diskCheckIntervalMs := c.DiskCheckIntervalMs currentDiskCheckInterval := time.Duration(diskCheckIntervalMs) * time.Millisecond // Calculate the new interval var newInterval time.Duration if logsPerSecond < targetLogsPerSecond/2 { // Load low -> increase interval newInterval = time.Duration(float64(currentDiskCheckInterval) * adaptiveIntervalFactor) } else if logsPerSecond > targetLogsPerSecond*2 { // Load high -> decrease interval newInterval = time.Duration(float64(currentDiskCheckInterval) * adaptiveSpeedUpFactor) } else { // No change needed if within normal range return } // Clamp interval using current config minCheckIntervalMs := c.MinCheckIntervalMs maxCheckIntervalMs := c.MaxCheckIntervalMs minCheckInterval := time.Duration(minCheckIntervalMs) * time.Millisecond maxCheckInterval := time.Duration(maxCheckIntervalMs) * time.Millisecond if newInterval < minCheckInterval { newInterval = minCheckInterval } if newInterval > maxCheckInterval { newInterval = maxCheckInterval } timers.diskCheckTicker.Reset(newInterval) } // handleHeartbeat processes a heartbeat timer tick func (l *Logger) handleHeartbeat() { c := l.getConfig() heartbeatLevel := c.HeartbeatLevel if heartbeatLevel >= 1 { l.logProcHeartbeat() } if heartbeatLevel >= 2 { l.logDiskHeartbeat() } if heartbeatLevel >= 3 { l.logSysHeartbeat() } } // logProcHeartbeat logs process/logger statistics heartbeat func (l *Logger) logProcHeartbeat() { processed := l.state.TotalLogsProcessed.Load() dropped := l.state.DroppedLogs.Load() sequence := l.state.HeartbeatSequence.Add(1) startTimeVal := l.state.LoggerStartTime.Load() var uptimeHours float64 = 0 if startTime, ok := startTimeVal.(time.Time); ok && !startTime.IsZero() { uptime := time.Since(startTime) uptimeHours = uptime.Hours() } procArgs := []any{ "type", "proc", "sequence", sequence, "uptime_hours", fmt.Sprintf("%.2f", uptimeHours), "processed_logs", processed, "dropped_logs", dropped, } l.writeHeartbeatRecord(LevelProc, procArgs) } // logDiskHeartbeat logs disk/file statistics heartbeat func (l *Logger) logDiskHeartbeat() { sequence := l.state.HeartbeatSequence.Load() rotations := l.state.TotalRotations.Load() deletions := l.state.TotalDeletions.Load() c := l.getConfig() dir := c.Directory ext := c.Extension currentSizeMB := float64(l.state.CurrentSize.Load()) / (1024 * 1024) // Current file size totalSizeMB := float64(-1.0) // Default error value fileCount := -1 // Default error value dirSize, err := l.getLogDirSize(dir, ext) if err == nil { totalSizeMB = float64(dirSize) / (1024 * 1024) } else { l.internalLog("warning - heartbeat failed to get dir size: %v\n", err) } count, err := l.getLogFileCount(dir, ext) if err == nil { fileCount = count } else { l.internalLog("warning - heartbeat failed to get file count: %v\n", err) } diskArgs := []any{ "type", "disk", "sequence", sequence, "rotated_files", rotations, "deleted_files", deletions, "total_log_size_mb", fmt.Sprintf("%.2f", totalSizeMB), "log_file_count", fileCount, "current_file_size_mb", fmt.Sprintf("%.2f", currentSizeMB), "disk_status_ok", l.state.DiskStatusOK.Load(), } // Add disk free space if we can get it freeSpace, err := l.getDiskFreeSpace(dir) if err == nil { freeSpaceMB := float64(freeSpace) / (1024 * 1024) diskArgs = append(diskArgs, "disk_free_mb", fmt.Sprintf("%.2f", freeSpaceMB)) } l.writeHeartbeatRecord(LevelDisk, diskArgs) } // logSysHeartbeat logs system/runtime statistics heartbeat func (l *Logger) logSysHeartbeat() { sequence := l.state.HeartbeatSequence.Load() var memStats runtime.MemStats runtime.ReadMemStats(&memStats) sysArgs := []any{ "type", "sys", "sequence", sequence, "alloc_mb", fmt.Sprintf("%.2f", float64(memStats.Alloc)/(1024*1024)), "sys_mb", fmt.Sprintf("%.2f", float64(memStats.Sys)/(1024*1024)), "num_gc", memStats.NumGC, "num_goroutine", runtime.NumGoroutine(), } // Write the heartbeat record l.writeHeartbeatRecord(LevelSys, sysArgs) } // writeHeartbeatRecord creates and sends a heartbeat log record through the main processing channel func (l *Logger) writeHeartbeatRecord(level int64, args []any) { if l.state.LoggerDisabled.Load() || l.state.ShutdownCalled.Load() { return } // Create heartbeat record with appropriate flags record := logRecord{ Flags: FlagDefault | FlagShowLevel, TimeStamp: time.Now(), Level: level, Trace: "", Args: args, unreportedDrops: 0, } // Send through the main processing channel l.sendLogRecord(record) }