e1.10.0 Configuration refactored.
This commit is contained in:
70
processor.go
70
processor.go
@ -14,6 +14,8 @@ const (
|
||||
// Factors to adjust check interval
|
||||
adaptiveIntervalFactor float64 = 1.5 // Slow down
|
||||
adaptiveSpeedUpFactor float64 = 0.8 // Speed up
|
||||
// Minimum wait time used throughout the package
|
||||
minWaitTime = time.Duration(10 * time.Millisecond)
|
||||
)
|
||||
|
||||
// processLogs is the main log processing loop running in a separate goroutine
|
||||
@ -25,14 +27,15 @@ func (l *Logger) processLogs(ch <-chan logRecord) {
|
||||
timers := l.setupProcessingTimers()
|
||||
defer l.closeProcessingTimers(timers)
|
||||
|
||||
c := l.getConfig()
|
||||
|
||||
// Perform an initial disk check on startup (skip if file output is disabled)
|
||||
disableFile, _ := l.config.Bool("log.disable_file")
|
||||
if !disableFile {
|
||||
if !c.DisableFile {
|
||||
l.performDiskCheck(true)
|
||||
}
|
||||
|
||||
// Send initial heartbeats immediately instead of waiting for first tick
|
||||
heartbeatLevel, _ := l.config.Int64("log.heartbeat_level")
|
||||
heartbeatLevel := c.HeartbeatLevel
|
||||
if heartbeatLevel > 0 {
|
||||
if heartbeatLevel >= 1 {
|
||||
l.logProcHeartbeat()
|
||||
@ -114,10 +117,12 @@ type TimerSet struct {
|
||||
func (l *Logger) setupProcessingTimers() *TimerSet {
|
||||
timers := &TimerSet{}
|
||||
|
||||
c := l.getConfig()
|
||||
|
||||
// Set up flush timer
|
||||
flushInterval, _ := l.config.Int64("log.flush_interval_ms")
|
||||
flushInterval := c.FlushIntervalMs
|
||||
if flushInterval <= 0 {
|
||||
flushInterval = 100
|
||||
flushInterval = DefaultConfig().FlushIntervalMs
|
||||
}
|
||||
timers.flushTicker = time.NewTicker(time.Duration(flushInterval) * time.Millisecond)
|
||||
|
||||
@ -149,8 +154,9 @@ func (l *Logger) closeProcessingTimers(timers *TimerSet) {
|
||||
|
||||
// setupRetentionTimer configures the retention check timer if retention is enabled
|
||||
func (l *Logger) setupRetentionTimer(timers *TimerSet) <-chan time.Time {
|
||||
retentionPeriodHrs, _ := l.config.Float64("log.retention_period_hrs")
|
||||
retentionCheckMins, _ := l.config.Float64("log.retention_check_mins")
|
||||
c := l.getConfig()
|
||||
retentionPeriodHrs := c.RetentionPeriodHrs
|
||||
retentionCheckMins := c.RetentionCheckMins
|
||||
retentionDur := time.Duration(retentionPeriodHrs * float64(time.Hour))
|
||||
retentionCheckInterval := time.Duration(retentionCheckMins * float64(time.Minute))
|
||||
|
||||
@ -164,15 +170,16 @@ func (l *Logger) setupRetentionTimer(timers *TimerSet) <-chan time.Time {
|
||||
|
||||
// setupDiskCheckTimer configures the disk check timer
|
||||
func (l *Logger) setupDiskCheckTimer() *time.Ticker {
|
||||
diskCheckIntervalMs, _ := l.config.Int64("log.disk_check_interval_ms")
|
||||
c := l.getConfig()
|
||||
diskCheckIntervalMs := c.DiskCheckIntervalMs
|
||||
if diskCheckIntervalMs <= 0 {
|
||||
diskCheckIntervalMs = 5000
|
||||
}
|
||||
currentDiskCheckInterval := time.Duration(diskCheckIntervalMs) * time.Millisecond
|
||||
|
||||
// Ensure initial interval respects bounds
|
||||
minCheckIntervalMs, _ := l.config.Int64("log.min_check_interval_ms")
|
||||
maxCheckIntervalMs, _ := l.config.Int64("log.max_check_interval_ms")
|
||||
minCheckIntervalMs := c.MinCheckIntervalMs
|
||||
maxCheckIntervalMs := c.MaxCheckIntervalMs
|
||||
minCheckInterval := time.Duration(minCheckIntervalMs) * time.Millisecond
|
||||
maxCheckInterval := time.Duration(maxCheckIntervalMs) * time.Millisecond
|
||||
|
||||
@ -188,12 +195,13 @@ func (l *Logger) setupDiskCheckTimer() *time.Ticker {
|
||||
|
||||
// setupHeartbeatTimer configures the heartbeat timer if heartbeats are enabled
|
||||
func (l *Logger) setupHeartbeatTimer(timers *TimerSet) <-chan time.Time {
|
||||
heartbeatLevel, _ := l.config.Int64("log.heartbeat_level")
|
||||
c := l.getConfig()
|
||||
heartbeatLevel := c.HeartbeatLevel
|
||||
if heartbeatLevel > 0 {
|
||||
intervalS, _ := l.config.Int64("log.heartbeat_interval_s")
|
||||
intervalS := c.HeartbeatIntervalS
|
||||
// Make sure interval is positive
|
||||
if intervalS <= 0 {
|
||||
intervalS = 60 // Default to 60 seconds
|
||||
intervalS = DefaultConfig().HeartbeatIntervalS
|
||||
}
|
||||
timers.heartbeatTicker = time.NewTicker(time.Duration(intervalS) * time.Second)
|
||||
return timers.heartbeatTicker.C
|
||||
@ -203,15 +211,16 @@ func (l *Logger) setupHeartbeatTimer(timers *TimerSet) <-chan time.Time {
|
||||
|
||||
// processLogRecord handles individual log records, returning bytes written
|
||||
func (l *Logger) processLogRecord(record logRecord) int64 {
|
||||
c := l.getConfig()
|
||||
// Check if the record should process this record
|
||||
disableFile, _ := l.config.Bool("log.disable_file")
|
||||
disableFile := c.DisableFile
|
||||
if !disableFile && !l.state.DiskStatusOK.Load() {
|
||||
l.state.DroppedLogs.Add(1)
|
||||
return 0
|
||||
}
|
||||
|
||||
// Serialize the log entry once
|
||||
format, _ := l.config.String("log.format")
|
||||
format := c.Format
|
||||
data := l.serializer.serialize(
|
||||
format,
|
||||
record.Flags,
|
||||
@ -223,7 +232,7 @@ func (l *Logger) processLogRecord(record logRecord) int64 {
|
||||
dataLen := int64(len(data))
|
||||
|
||||
// Mirror to stdout if enabled
|
||||
enableStdout, _ := l.config.Bool("log.enable_stdout")
|
||||
enableStdout := c.EnableStdout
|
||||
if enableStdout {
|
||||
if s := l.state.StdoutWriter.Load(); s != nil {
|
||||
// Assert to concrete type: *sink
|
||||
@ -244,7 +253,7 @@ func (l *Logger) processLogRecord(record logRecord) int64 {
|
||||
currentFileSize := l.state.CurrentSize.Load()
|
||||
estimatedSize := currentFileSize + dataLen
|
||||
|
||||
maxSizeMB, _ := l.config.Int64("log.max_size_mb")
|
||||
maxSizeMB := c.MaxSizeMB
|
||||
if maxSizeMB > 0 && estimatedSize > maxSizeMB*1024*1024 {
|
||||
if err := l.rotateLogFile(); err != nil {
|
||||
l.internalLog("failed to rotate log file: %v\n", err)
|
||||
@ -276,7 +285,8 @@ func (l *Logger) processLogRecord(record logRecord) int64 {
|
||||
|
||||
// handleFlushTick handles the periodic flush timer tick
|
||||
func (l *Logger) handleFlushTick() {
|
||||
enableSync, _ := l.config.Bool("log.enable_periodic_sync")
|
||||
c := l.getConfig()
|
||||
enableSync := c.EnablePeriodicSync
|
||||
if enableSync {
|
||||
l.performSync()
|
||||
}
|
||||
@ -290,7 +300,8 @@ func (l *Logger) handleFlushRequest(confirmChan chan struct{}) {
|
||||
|
||||
// handleRetentionCheck performs file retention check and cleanup
|
||||
func (l *Logger) handleRetentionCheck() {
|
||||
retentionPeriodHrs, _ := l.config.Float64("log.retention_period_hrs")
|
||||
c := l.getConfig()
|
||||
retentionPeriodHrs := c.RetentionPeriodHrs
|
||||
retentionDur := time.Duration(retentionPeriodHrs * float64(time.Hour))
|
||||
|
||||
if retentionDur > 0 {
|
||||
@ -311,20 +322,21 @@ func (l *Logger) handleRetentionCheck() {
|
||||
|
||||
// adjustDiskCheckInterval modifies the disk check interval based on logging activity
|
||||
func (l *Logger) adjustDiskCheckInterval(timers *TimerSet, lastCheckTime time.Time, logsSinceLastCheck int64) {
|
||||
enableAdaptive, _ := l.config.Bool("log.enable_adaptive_interval")
|
||||
c := l.getConfig()
|
||||
enableAdaptive := c.EnableAdaptiveInterval
|
||||
if !enableAdaptive {
|
||||
return
|
||||
}
|
||||
|
||||
elapsed := time.Since(lastCheckTime)
|
||||
if elapsed < 10*time.Millisecond { // Min arbitrary reasonable value
|
||||
elapsed = 10 * time.Millisecond
|
||||
if elapsed < minWaitTime { // Min arbitrary reasonable value
|
||||
elapsed = minWaitTime
|
||||
}
|
||||
|
||||
logsPerSecond := float64(logsSinceLastCheck) / elapsed.Seconds()
|
||||
targetLogsPerSecond := float64(100) // Baseline
|
||||
|
||||
diskCheckIntervalMs, _ := l.config.Int64("log.disk_check_interval_ms")
|
||||
diskCheckIntervalMs := c.DiskCheckIntervalMs
|
||||
currentDiskCheckInterval := time.Duration(diskCheckIntervalMs) * time.Millisecond
|
||||
|
||||
// Calculate the new interval
|
||||
@ -339,8 +351,8 @@ func (l *Logger) adjustDiskCheckInterval(timers *TimerSet, lastCheckTime time.Ti
|
||||
}
|
||||
|
||||
// Clamp interval using current config
|
||||
minCheckIntervalMs, _ := l.config.Int64("log.min_check_interval_ms")
|
||||
maxCheckIntervalMs, _ := l.config.Int64("log.max_check_interval_ms")
|
||||
minCheckIntervalMs := c.MinCheckIntervalMs
|
||||
maxCheckIntervalMs := c.MaxCheckIntervalMs
|
||||
minCheckInterval := time.Duration(minCheckIntervalMs) * time.Millisecond
|
||||
maxCheckInterval := time.Duration(maxCheckIntervalMs) * time.Millisecond
|
||||
|
||||
@ -356,7 +368,8 @@ func (l *Logger) adjustDiskCheckInterval(timers *TimerSet, lastCheckTime time.Ti
|
||||
|
||||
// handleHeartbeat processes a heartbeat timer tick
|
||||
func (l *Logger) handleHeartbeat() {
|
||||
heartbeatLevel, _ := l.config.Int64("log.heartbeat_level")
|
||||
c := l.getConfig()
|
||||
heartbeatLevel := c.HeartbeatLevel
|
||||
|
||||
if heartbeatLevel >= 1 {
|
||||
l.logProcHeartbeat()
|
||||
@ -401,8 +414,9 @@ func (l *Logger) logDiskHeartbeat() {
|
||||
rotations := l.state.TotalRotations.Load()
|
||||
deletions := l.state.TotalDeletions.Load()
|
||||
|
||||
dir, _ := l.config.String("log.directory")
|
||||
ext, _ := l.config.String("log.extension")
|
||||
c := l.getConfig()
|
||||
dir := c.Directory
|
||||
ext := c.Extension
|
||||
currentSizeMB := float64(l.state.CurrentSize.Load()) / (1024 * 1024) // Current file size
|
||||
totalSizeMB := float64(-1.0) // Default error value
|
||||
fileCount := -1 // Default error value
|
||||
|
||||
Reference in New Issue
Block a user