e1.2.1 Readme update and comment cleanup.
This commit is contained in:
30
processor.go
30
processor.go
@ -52,7 +52,6 @@ func (l *Logger) processLogs(ch <-chan logRecord) {
|
||||
select {
|
||||
case record, ok := <-ch:
|
||||
if !ok {
|
||||
// Channel closed: Perform final sync and exit
|
||||
l.performSync()
|
||||
return
|
||||
}
|
||||
@ -206,10 +205,9 @@ func (l *Logger) setupHeartbeatTimer(timers *TimerSet) <-chan time.Time {
|
||||
func (l *Logger) processLogRecord(record logRecord) int64 {
|
||||
if !l.state.DiskStatusOK.Load() {
|
||||
l.state.DroppedLogs.Add(1)
|
||||
return 0 // Skip processing if disk known to be unavailable
|
||||
return 0
|
||||
}
|
||||
|
||||
// Serialize the record
|
||||
format, _ := l.config.String("log.format")
|
||||
data := l.serializer.serialize(
|
||||
format,
|
||||
@ -221,7 +219,6 @@ func (l *Logger) processLogRecord(record logRecord) int64 {
|
||||
)
|
||||
dataLen := int64(len(data))
|
||||
|
||||
// Check for rotation
|
||||
currentFileSize := l.state.CurrentSize.Load()
|
||||
estimatedSize := currentFileSize + dataLen
|
||||
|
||||
@ -232,14 +229,13 @@ func (l *Logger) processLogRecord(record logRecord) int64 {
|
||||
}
|
||||
}
|
||||
|
||||
// Write to the current log file
|
||||
cfPtr := l.state.CurrentFile.Load()
|
||||
if currentLogFile, isFile := cfPtr.(*os.File); isFile && currentLogFile != nil {
|
||||
n, err := currentLogFile.Write(data)
|
||||
if err != nil {
|
||||
fmtFprintf(os.Stderr, "log: failed to write to log file: %v\n", err)
|
||||
l.state.DroppedLogs.Add(1)
|
||||
l.performDiskCheck(true) // Force check if write fails
|
||||
l.performDiskCheck(true)
|
||||
return 0
|
||||
} else {
|
||||
l.state.CurrentSize.Add(int64(n))
|
||||
@ -247,7 +243,7 @@ func (l *Logger) processLogRecord(record logRecord) int64 {
|
||||
return int64(n)
|
||||
}
|
||||
} else {
|
||||
l.state.DroppedLogs.Add(1) // File pointer somehow nil
|
||||
l.state.DroppedLogs.Add(1)
|
||||
return 0
|
||||
}
|
||||
}
|
||||
@ -263,7 +259,7 @@ func (l *Logger) handleFlushTick() {
|
||||
// handleFlushRequest handles an explicit flush request
|
||||
func (l *Logger) handleFlushRequest(confirmChan chan struct{}) {
|
||||
l.performSync()
|
||||
close(confirmChan) // Signal completion back to the Flush caller
|
||||
close(confirmChan)
|
||||
}
|
||||
|
||||
// handleRetentionCheck performs file retention check and cleanup
|
||||
@ -302,7 +298,6 @@ func (l *Logger) adjustDiskCheckInterval(timers *TimerSet, lastCheckTime time.Ti
|
||||
logsPerSecond := float64(logsSinceLastCheck) / elapsed.Seconds()
|
||||
targetLogsPerSecond := float64(100) // Baseline
|
||||
|
||||
// Get current disk check interval from config
|
||||
diskCheckIntervalMs, _ := l.config.Int64("log.disk_check_interval_ms")
|
||||
currentDiskCheckInterval := time.Duration(diskCheckIntervalMs) * time.Millisecond
|
||||
|
||||
@ -330,7 +325,6 @@ func (l *Logger) adjustDiskCheckInterval(timers *TimerSet, lastCheckTime time.Ti
|
||||
newInterval = maxCheckInterval
|
||||
}
|
||||
|
||||
// Reset the ticker with the new interval
|
||||
timers.diskCheckTicker.Reset(newInterval)
|
||||
}
|
||||
|
||||
@ -338,7 +332,6 @@ func (l *Logger) adjustDiskCheckInterval(timers *TimerSet, lastCheckTime time.Ti
|
||||
func (l *Logger) handleHeartbeat() {
|
||||
heartbeatLevel, _ := l.config.Int64("log.heartbeat_level")
|
||||
|
||||
// Process heartbeat based on configured level
|
||||
if heartbeatLevel >= 1 {
|
||||
l.logProcHeartbeat()
|
||||
}
|
||||
@ -354,12 +347,10 @@ func (l *Logger) handleHeartbeat() {
|
||||
|
||||
// logProcHeartbeat logs process/logger statistics heartbeat
|
||||
func (l *Logger) logProcHeartbeat() {
|
||||
// 1. Gather process/logger stats
|
||||
processed := l.state.TotalLogsProcessed.Load()
|
||||
dropped := l.state.DroppedLogs.Load()
|
||||
sequence := l.state.HeartbeatSequence.Add(1) // Increment and get sequence number
|
||||
|
||||
// Calculate uptime
|
||||
startTimeVal := l.state.LoggerStartTime.Load()
|
||||
var uptimeHours float64 = 0
|
||||
if startTime, ok := startTimeVal.(time.Time); ok && !startTime.IsZero() {
|
||||
@ -367,7 +358,6 @@ func (l *Logger) logProcHeartbeat() {
|
||||
uptimeHours = uptime.Hours()
|
||||
}
|
||||
|
||||
// 2. Format Args
|
||||
procArgs := []any{
|
||||
"type", "proc",
|
||||
"sequence", sequence,
|
||||
@ -376,7 +366,6 @@ func (l *Logger) logProcHeartbeat() {
|
||||
"dropped_logs", dropped,
|
||||
}
|
||||
|
||||
// 3. Write the heartbeat record
|
||||
l.writeHeartbeatRecord(LevelProc, procArgs)
|
||||
}
|
||||
|
||||
@ -386,7 +375,6 @@ func (l *Logger) logDiskHeartbeat() {
|
||||
rotations := l.state.TotalRotations.Load()
|
||||
deletions := l.state.TotalDeletions.Load()
|
||||
|
||||
// Get file system stats
|
||||
dir, _ := l.config.String("log.directory")
|
||||
ext, _ := l.config.String("log.extension")
|
||||
currentSizeMB := float64(l.state.CurrentSize.Load()) / (1024 * 1024) // Current file size
|
||||
@ -407,7 +395,6 @@ func (l *Logger) logDiskHeartbeat() {
|
||||
fmtFprintf(os.Stderr, "log: warning - heartbeat failed to get file count: %v\n", err)
|
||||
}
|
||||
|
||||
// Format Args
|
||||
diskArgs := []any{
|
||||
"type", "disk",
|
||||
"sequence", sequence,
|
||||
@ -426,7 +413,6 @@ func (l *Logger) logDiskHeartbeat() {
|
||||
diskArgs = append(diskArgs, "disk_free_mb", fmt.Sprintf("%.2f", freeSpaceMB))
|
||||
}
|
||||
|
||||
// Write the heartbeat record
|
||||
l.writeHeartbeatRecord(LevelDisk, diskArgs)
|
||||
}
|
||||
|
||||
@ -434,11 +420,9 @@ func (l *Logger) logDiskHeartbeat() {
|
||||
func (l *Logger) logSysHeartbeat() {
|
||||
sequence := l.state.HeartbeatSequence.Load()
|
||||
|
||||
// Get memory stats
|
||||
var memStats runtime.MemStats
|
||||
runtime.ReadMemStats(&memStats)
|
||||
|
||||
// Format Args
|
||||
sysArgs := []any{
|
||||
"type", "sys",
|
||||
"sequence", sequence,
|
||||
@ -454,22 +438,17 @@ func (l *Logger) logSysHeartbeat() {
|
||||
|
||||
// writeHeartbeatRecord handles the common logic for writing a heartbeat record
|
||||
func (l *Logger) writeHeartbeatRecord(level int64, args []any) {
|
||||
// Skip if logger disabled or shutting down
|
||||
if l.state.LoggerDisabled.Load() || l.state.ShutdownCalled.Load() {
|
||||
return
|
||||
}
|
||||
|
||||
// Skip if disk known to be unavailable
|
||||
if !l.state.DiskStatusOK.Load() {
|
||||
return
|
||||
}
|
||||
|
||||
// 1. Serialize the record
|
||||
format, _ := l.config.String("log.format")
|
||||
// Use FlagDefault | FlagShowLevel so Level appears in the output
|
||||
hbData := l.serializer.serialize(format, FlagDefault|FlagShowLevel, time.Now(), level, "", args)
|
||||
|
||||
// 2. Write the record
|
||||
cfPtr := l.state.CurrentFile.Load()
|
||||
if cfPtr == nil {
|
||||
fmtFprintf(os.Stderr, "log: error - current file handle is nil during heartbeat\n")
|
||||
@ -482,7 +461,6 @@ func (l *Logger) writeHeartbeatRecord(level int64, args []any) {
|
||||
return
|
||||
}
|
||||
|
||||
// Write with a single retry attempt
|
||||
n, err := currentLogFile.Write(hbData)
|
||||
if err != nil {
|
||||
fmtFprintf(os.Stderr, "log: failed to write heartbeat: %v\n", err)
|
||||
|
||||
Reference in New Issue
Block a user