Files
log/processor.go

507 lines
14 KiB
Go

// FILE: processor.go
package log
import (
"fmt"
"os"
"runtime"
"time"
)
const (
// Threshold for triggering reactive disk check
reactiveCheckThresholdBytes int64 = 10 * 1024 * 1024
// Factors to adjust check interval
adaptiveIntervalFactor float64 = 1.5 // Slow down
adaptiveSpeedUpFactor float64 = 0.8 // Speed up
// Minimum wait time used throughout the package
minWaitTime = 10 * time.Millisecond
)
// processLogs is the main log processing loop running in a separate goroutine
func (l *Logger) processLogs(ch <-chan logRecord) {
l.state.ProcessorExited.Store(false)
defer l.state.ProcessorExited.Store(true)
// Set up timers and state variables
timers := l.setupProcessingTimers()
defer l.closeProcessingTimers(timers)
c := l.getConfig()
// Perform an initial disk check on startup (skip if file output is disabled)
if !c.DisableFile {
l.performDiskCheck(true)
}
// Send initial heartbeats immediately instead of waiting for first tick
heartbeatLevel := c.HeartbeatLevel
if heartbeatLevel > 0 {
if heartbeatLevel >= 1 {
l.logProcHeartbeat()
}
if heartbeatLevel >= 2 {
l.logDiskHeartbeat()
}
if heartbeatLevel >= 3 {
l.logSysHeartbeat()
}
}
// State variables for adaptive disk checks
var bytesSinceLastCheck int64 = 0
var lastCheckTime = time.Now()
var logsSinceLastCheck int64 = 0
// --- Main Loop ---
for {
select {
case record, ok := <-ch:
if !ok {
l.performSync()
return
}
// Process the received log record
bytesWritten := l.processLogRecord(record)
if bytesWritten > 0 {
// Update adaptive check counters
bytesSinceLastCheck += bytesWritten
logsSinceLastCheck++
// Reactive Check Trigger
if bytesSinceLastCheck > reactiveCheckThresholdBytes {
if l.performDiskCheck(false) {
bytesSinceLastCheck = 0
logsSinceLastCheck = 0
lastCheckTime = time.Now()
}
}
}
case <-timers.flushTicker.C:
l.handleFlushTick()
case <-timers.diskCheckTicker.C:
// Periodic disk check
if l.performDiskCheck(true) {
l.adjustDiskCheckInterval(timers, lastCheckTime, logsSinceLastCheck)
bytesSinceLastCheck = 0
logsSinceLastCheck = 0
lastCheckTime = time.Now()
}
case confirmChan := <-l.state.flushRequestChan:
l.handleFlushRequest(confirmChan)
case <-timers.retentionChan:
l.handleRetentionCheck()
case <-timers.heartbeatChan:
l.handleHeartbeat()
}
}
}
// TimerSet holds all timers used in processLogs
type TimerSet struct {
flushTicker *time.Ticker
diskCheckTicker *time.Ticker
retentionTicker *time.Ticker
heartbeatTicker *time.Ticker
retentionChan <-chan time.Time
heartbeatChan <-chan time.Time
}
// setupProcessingTimers creates and configures all necessary timers for the processor
func (l *Logger) setupProcessingTimers() *TimerSet {
timers := &TimerSet{}
c := l.getConfig()
// Set up flush timer
flushInterval := c.FlushIntervalMs
if flushInterval <= 0 {
flushInterval = DefaultConfig().FlushIntervalMs
}
timers.flushTicker = time.NewTicker(time.Duration(flushInterval) * time.Millisecond)
// Set up retention timer if enabled
timers.retentionChan = l.setupRetentionTimer(timers)
// Set up disk check timer
timers.diskCheckTicker = l.setupDiskCheckTimer()
// Set up heartbeat timer
timers.heartbeatChan = l.setupHeartbeatTimer(timers)
return timers
}
// closeProcessingTimers stops all active timers
func (l *Logger) closeProcessingTimers(timers *TimerSet) {
timers.flushTicker.Stop()
if timers.diskCheckTicker != nil {
timers.diskCheckTicker.Stop()
}
if timers.retentionTicker != nil {
timers.retentionTicker.Stop()
}
if timers.heartbeatTicker != nil {
timers.heartbeatTicker.Stop()
}
}
// setupRetentionTimer configures the retention check timer if retention is enabled
func (l *Logger) setupRetentionTimer(timers *TimerSet) <-chan time.Time {
c := l.getConfig()
retentionPeriodHrs := c.RetentionPeriodHrs
retentionCheckMins := c.RetentionCheckMins
retentionDur := time.Duration(retentionPeriodHrs * float64(time.Hour))
retentionCheckInterval := time.Duration(retentionCheckMins * float64(time.Minute))
if retentionDur > 0 && retentionCheckInterval > 0 {
timers.retentionTicker = time.NewTicker(retentionCheckInterval)
l.updateEarliestFileTime() // Initial check
return timers.retentionTicker.C
}
return nil
}
// setupDiskCheckTimer configures the disk check timer
func (l *Logger) setupDiskCheckTimer() *time.Ticker {
c := l.getConfig()
diskCheckIntervalMs := c.DiskCheckIntervalMs
if diskCheckIntervalMs <= 0 {
diskCheckIntervalMs = 5000
}
currentDiskCheckInterval := time.Duration(diskCheckIntervalMs) * time.Millisecond
// Ensure initial interval respects bounds
minCheckIntervalMs := c.MinCheckIntervalMs
maxCheckIntervalMs := c.MaxCheckIntervalMs
minCheckInterval := time.Duration(minCheckIntervalMs) * time.Millisecond
maxCheckInterval := time.Duration(maxCheckIntervalMs) * time.Millisecond
if currentDiskCheckInterval < minCheckInterval {
currentDiskCheckInterval = minCheckInterval
}
if currentDiskCheckInterval > maxCheckInterval {
currentDiskCheckInterval = maxCheckInterval
}
return time.NewTicker(currentDiskCheckInterval)
}
// setupHeartbeatTimer configures the heartbeat timer if heartbeats are enabled
func (l *Logger) setupHeartbeatTimer(timers *TimerSet) <-chan time.Time {
c := l.getConfig()
heartbeatLevel := c.HeartbeatLevel
if heartbeatLevel > 0 {
intervalS := c.HeartbeatIntervalS
// Make sure interval is positive
if intervalS <= 0 {
intervalS = DefaultConfig().HeartbeatIntervalS
}
timers.heartbeatTicker = time.NewTicker(time.Duration(intervalS) * time.Second)
return timers.heartbeatTicker.C
}
return nil
}
// processLogRecord handles individual log records, returning bytes written
func (l *Logger) processLogRecord(record logRecord) int64 {
c := l.getConfig()
// Check if the record should process this record
disableFile := c.DisableFile
if !disableFile && !l.state.DiskStatusOK.Load() {
l.state.DroppedLogs.Add(1)
return 0
}
// Serialize the log entry once
format := c.Format
data := l.serializer.serialize(
format,
record.Flags,
record.TimeStamp,
record.Level,
record.Trace,
record.Args,
)
dataLen := int64(len(data))
// Mirror to stdout if enabled
enableStdout := c.EnableStdout
if enableStdout {
if s := l.state.StdoutWriter.Load(); s != nil {
if sinkWrapper, ok := s.(*sink); ok && sinkWrapper != nil {
// Handle split mode
if c.StdoutTarget == "split" {
if record.Level >= LevelWarn {
// Write WARN and ERROR to stderr
_, _ = os.Stderr.Write(data)
} else {
// Write INFO and DEBUG to stdout
_, _ = sinkWrapper.w.Write(data)
}
} else {
// Write to the configured target (stdout or stderr)
_, _ = sinkWrapper.w.Write(data)
}
}
}
}
// Skip file operations if file output is disabled
if disableFile {
l.state.TotalLogsProcessed.Add(1)
return dataLen // Return data length for adaptive interval calculations
}
// File rotation check
currentFileSize := l.state.CurrentSize.Load()
estimatedSize := currentFileSize + dataLen
maxSizeMB := c.MaxSizeMB
if maxSizeMB > 0 && estimatedSize > maxSizeMB*1024*1024 {
if err := l.rotateLogFile(); err != nil {
l.internalLog("failed to rotate log file: %v\n", err)
// Account for the dropped log that triggered the failed rotation
l.state.DroppedLogs.Add(1)
return 0
}
}
// Write to file
cfPtr := l.state.CurrentFile.Load()
if currentLogFile, isFile := cfPtr.(*os.File); isFile && currentLogFile != nil {
n, err := currentLogFile.Write(data)
if err != nil {
l.internalLog("failed to write to log file: %v\n", err)
l.state.DroppedLogs.Add(1)
l.performDiskCheck(true)
return 0
} else {
l.state.CurrentSize.Add(int64(n))
l.state.TotalLogsProcessed.Add(1)
return int64(n)
}
} else {
l.state.DroppedLogs.Add(1)
return 0
}
}
// handleFlushTick handles the periodic flush timer tick
func (l *Logger) handleFlushTick() {
c := l.getConfig()
enableSync := c.EnablePeriodicSync
if enableSync {
l.performSync()
}
}
// handleFlushRequest handles an explicit flush request
func (l *Logger) handleFlushRequest(confirmChan chan struct{}) {
l.performSync()
close(confirmChan)
}
// handleRetentionCheck performs file retention check and cleanup
func (l *Logger) handleRetentionCheck() {
c := l.getConfig()
retentionPeriodHrs := c.RetentionPeriodHrs
retentionDur := time.Duration(retentionPeriodHrs * float64(time.Hour))
if retentionDur > 0 {
etPtr := l.state.EarliestFileTime.Load()
if earliest, ok := etPtr.(time.Time); ok && !earliest.IsZero() {
if time.Since(earliest) > retentionDur {
if err := l.cleanExpiredLogs(earliest); err == nil {
l.updateEarliestFileTime()
} else {
l.internalLog("failed to clean expired logs: %v\n", err)
}
}
} else if !ok || earliest.IsZero() {
l.updateEarliestFileTime()
}
}
}
// adjustDiskCheckInterval modifies the disk check interval based on logging activity
func (l *Logger) adjustDiskCheckInterval(timers *TimerSet, lastCheckTime time.Time, logsSinceLastCheck int64) {
c := l.getConfig()
enableAdaptive := c.EnableAdaptiveInterval
if !enableAdaptive {
return
}
elapsed := time.Since(lastCheckTime)
if elapsed < minWaitTime { // Min arbitrary reasonable value
elapsed = minWaitTime
}
logsPerSecond := float64(logsSinceLastCheck) / elapsed.Seconds()
targetLogsPerSecond := float64(100) // Baseline
diskCheckIntervalMs := c.DiskCheckIntervalMs
currentDiskCheckInterval := time.Duration(diskCheckIntervalMs) * time.Millisecond
// Calculate the new interval
var newInterval time.Duration
if logsPerSecond < targetLogsPerSecond/2 { // Load low -> increase interval
newInterval = time.Duration(float64(currentDiskCheckInterval) * adaptiveIntervalFactor)
} else if logsPerSecond > targetLogsPerSecond*2 { // Load high -> decrease interval
newInterval = time.Duration(float64(currentDiskCheckInterval) * adaptiveSpeedUpFactor)
} else {
// No change needed if within normal range
return
}
// Clamp interval using current config
minCheckIntervalMs := c.MinCheckIntervalMs
maxCheckIntervalMs := c.MaxCheckIntervalMs
minCheckInterval := time.Duration(minCheckIntervalMs) * time.Millisecond
maxCheckInterval := time.Duration(maxCheckIntervalMs) * time.Millisecond
if newInterval < minCheckInterval {
newInterval = minCheckInterval
}
if newInterval > maxCheckInterval {
newInterval = maxCheckInterval
}
timers.diskCheckTicker.Reset(newInterval)
}
// handleHeartbeat processes a heartbeat timer tick
func (l *Logger) handleHeartbeat() {
c := l.getConfig()
heartbeatLevel := c.HeartbeatLevel
if heartbeatLevel >= 1 {
l.logProcHeartbeat()
}
if heartbeatLevel >= 2 {
l.logDiskHeartbeat()
}
if heartbeatLevel >= 3 {
l.logSysHeartbeat()
}
}
// logProcHeartbeat logs process/logger statistics heartbeat
func (l *Logger) logProcHeartbeat() {
processed := l.state.TotalLogsProcessed.Load()
dropped := l.state.DroppedLogs.Load()
sequence := l.state.HeartbeatSequence.Add(1)
startTimeVal := l.state.LoggerStartTime.Load()
var uptimeHours float64 = 0
if startTime, ok := startTimeVal.(time.Time); ok && !startTime.IsZero() {
uptime := time.Since(startTime)
uptimeHours = uptime.Hours()
}
procArgs := []any{
"type", "proc",
"sequence", sequence,
"uptime_hours", fmt.Sprintf("%.2f", uptimeHours),
"processed_logs", processed,
"dropped_logs", dropped,
}
l.writeHeartbeatRecord(LevelProc, procArgs)
}
// logDiskHeartbeat logs disk/file statistics heartbeat
func (l *Logger) logDiskHeartbeat() {
sequence := l.state.HeartbeatSequence.Load()
rotations := l.state.TotalRotations.Load()
deletions := l.state.TotalDeletions.Load()
c := l.getConfig()
dir := c.Directory
ext := c.Extension
currentSizeMB := float64(l.state.CurrentSize.Load()) / (1024 * 1024) // Current file size
totalSizeMB := float64(-1.0) // Default error value
fileCount := -1 // Default error value
dirSize, err := l.getLogDirSize(dir, ext)
if err == nil {
totalSizeMB = float64(dirSize) / (1024 * 1024)
} else {
l.internalLog("warning - heartbeat failed to get dir size: %v\n", err)
}
count, err := l.getLogFileCount(dir, ext)
if err == nil {
fileCount = count
} else {
l.internalLog("warning - heartbeat failed to get file count: %v\n", err)
}
diskArgs := []any{
"type", "disk",
"sequence", sequence,
"rotated_files", rotations,
"deleted_files", deletions,
"total_log_size_mb", fmt.Sprintf("%.2f", totalSizeMB),
"log_file_count", fileCount,
"current_file_size_mb", fmt.Sprintf("%.2f", currentSizeMB),
"disk_status_ok", l.state.DiskStatusOK.Load(),
}
// Add disk free space if we can get it
freeSpace, err := l.getDiskFreeSpace(dir)
if err == nil {
freeSpaceMB := float64(freeSpace) / (1024 * 1024)
diskArgs = append(diskArgs, "disk_free_mb", fmt.Sprintf("%.2f", freeSpaceMB))
}
l.writeHeartbeatRecord(LevelDisk, diskArgs)
}
// logSysHeartbeat logs system/runtime statistics heartbeat
func (l *Logger) logSysHeartbeat() {
sequence := l.state.HeartbeatSequence.Load()
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
sysArgs := []any{
"type", "sys",
"sequence", sequence,
"alloc_mb", fmt.Sprintf("%.2f", float64(memStats.Alloc)/(1024*1024)),
"sys_mb", fmt.Sprintf("%.2f", float64(memStats.Sys)/(1024*1024)),
"num_gc", memStats.NumGC,
"num_goroutine", runtime.NumGoroutine(),
}
// Write the heartbeat record
l.writeHeartbeatRecord(LevelSys, sysArgs)
}
// writeHeartbeatRecord creates and sends a heartbeat log record through the main processing channel
func (l *Logger) writeHeartbeatRecord(level int64, args []any) {
if l.state.LoggerDisabled.Load() || l.state.ShutdownCalled.Load() {
return
}
// Create heartbeat record with appropriate flags
record := logRecord{
Flags: FlagDefault | FlagShowLevel,
TimeStamp: time.Now(),
Level: level,
Trace: "",
Args: args,
unreportedDrops: 0,
}
// Send through the main processing channel
l.sendLogRecord(record)
}