From ce6e3b7ffc32ac0b0837f3addcae4587806dc5da1758933a152845cec0fda4e1 Mon Sep 17 00:00:00 2001 From: Lixen Wraith Date: Mon, 21 Jul 2025 21:28:45 -0400 Subject: [PATCH] e3.1.0 Refactored lifecycle (configuration, drop report, heartbeat). --- builder.go | 12 +++ compat/compat_test.go | 51 ++++++++++-- config.go | 33 ++++++++ go.mod | 17 +--- go.sum | 32 -------- heartbeat.go | 28 ++++--- integration_test.go | 26 +++++-- lifecycle_test.go | 168 ++++++++++++++++++++++++++++++++++++++++ logger.go | 176 ++++++++++++++++++++++++++++-------------- logger_test.go | 32 +++++--- processor.go | 3 +- processor_test.go | 144 +++++++++++++++++++++++++++++++--- record.go | 66 +++++++--------- state.go | 8 +- storage.go | 8 +- type.go | 11 ++- 16 files changed, 615 insertions(+), 200 deletions(-) create mode 100644 lifecycle_test.go diff --git a/builder.go b/builder.go index aa466b2..85ff7c7 100644 --- a/builder.go +++ b/builder.go @@ -52,6 +52,12 @@ func (b *Builder) LevelString(level string) *Builder { return b } +// Name sets the log level. +func (b *Builder) Name(name string) *Builder { + b.cfg.Name = name + return b +} + // Directory sets the log directory. func (b *Builder) Directory(dir string) *Builder { b.cfg.Directory = dir @@ -64,6 +70,12 @@ func (b *Builder) Format(format string) *Builder { return b } +// Extension sets the log level. +func (b *Builder) Extension(ext string) *Builder { + b.cfg.Extension = ext + return b +} + // BufferSize sets the channel buffer size. func (b *Builder) BufferSize(size int64) *Builder { b.cfg.BufferSize = size diff --git a/compat/compat_test.go b/compat/compat_test.go index 37367af..22e74d2 100644 --- a/compat/compat_test.go +++ b/compat/compat_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "os" "path/filepath" + "strings" "testing" "time" @@ -25,6 +26,10 @@ func createTestCompatBuilder(t *testing.T) (*Builder, *log.Logger, string) { Build() require.NoError(t, err) + // Start the logger before using it. + err = appLogger.Start() + require.NoError(t, err) + builder := NewBuilder().WithLogger(appLogger) return builder, appLogger, tmpDir } @@ -81,6 +86,8 @@ func TestCompatBuilder(t *testing.T) { assert.NotNil(t, fasthttpAdapter) logger1, _ := builder.GetLogger() + // The builder now creates AND starts the logger internally if needed. + // We need to defer shutdown to clean up resources. defer logger1.Shutdown() }) } @@ -104,7 +111,8 @@ func TestGnetAdapter(t *testing.T) { err = logger.Flush(time.Second) require.NoError(t, err) - lines := readLogFile(t, tmpDir, 5) + // The "Logger started" message is also logged, so we expect 6 lines. + lines := readLogFile(t, tmpDir, 6) // Define expected log data. The order in the "fields" array is fixed by the adapter call. expected := []struct{ level, msg string }{ @@ -115,7 +123,16 @@ func TestGnetAdapter(t *testing.T) { {"ERROR", "gnet fatal id=5"}, } - for i, line := range lines { + // Filter out the "Logger started" line + var logLines []string + for _, line := range lines { + if !strings.Contains(line, "Logger started") { + logLines = append(logLines, line) + } + } + require.Len(t, logLines, 5, "Should have 5 gnet log lines after filtering") + + for i, line := range logLines { var entry map[string]interface{} err := json.Unmarshal([]byte(line), &entry) require.NoError(t, err, "Failed to parse log line: %s", line) @@ -145,10 +162,21 @@ func TestStructuredGnetAdapter(t *testing.T) { err = logger.Flush(time.Second) require.NoError(t, err) - lines := readLogFile(t, tmpDir, 1) + // The "Logger started" message is also logged, so we expect 2 lines. + lines := readLogFile(t, tmpDir, 2) + + // Find our specific log line + var logLine string + for _, line := range lines { + if strings.Contains(line, "request served") { + logLine = line + break + } + } + require.NotEmpty(t, logLine, "Did not find the structured gnet log line") var entry map[string]interface{} - err = json.Unmarshal([]byte(lines[0]), &entry) + err = json.Unmarshal([]byte(logLine), &entry) require.NoError(t, err) // The structured adapter parses keys and values, so we check them directly. @@ -178,17 +206,26 @@ func TestFastHTTPAdapter(t *testing.T) { "an error occurred while processing", } for _, msg := range testMessages { - // FIX: Use a constant format string to prevent build errors from `go vet`. adapter.Printf("%s", msg) } err = logger.Flush(time.Second) require.NoError(t, err) - lines := readLogFile(t, tmpDir, 4) + // Expect 4 test messages + 1 "Logger started" message + lines := readLogFile(t, tmpDir, 5) expectedLevels := []string{"INFO", "DEBUG", "WARN", "ERROR"} - for i, line := range lines { + // Filter out the "Logger started" line + var logLines []string + for _, line := range lines { + if !strings.Contains(line, "Logger started") { + logLines = append(logLines, line) + } + } + require.Len(t, logLines, 4, "Should have 4 fasthttp log lines after filtering") + + for i, line := range logLines { var entry map[string]interface{} err := json.Unmarshal([]byte(line), &entry) require.NoError(t, err, "Failed to parse log line: %s", line) diff --git a/config.go b/config.go index 3cf9682..c393661 100644 --- a/config.go +++ b/config.go @@ -346,6 +346,39 @@ func applyConfigField(cfg *Config, key, value string) error { return nil } +// configRequiresRestart checks if config changes require processor restart +func configRequiresRestart(oldCfg, newCfg *Config) bool { + // Channel size change requires restart + if oldCfg.BufferSize != newCfg.BufferSize { + return true + } + + // File output changes require restart + if oldCfg.DisableFile != newCfg.DisableFile { + return true + } + + // Directory or file naming changes require restart + if oldCfg.Directory != newCfg.Directory || + oldCfg.Name != newCfg.Name || + oldCfg.Extension != newCfg.Extension { + return true + } + + // Timer changes require restart + if oldCfg.FlushIntervalMs != newCfg.FlushIntervalMs || + oldCfg.DiskCheckIntervalMs != newCfg.DiskCheckIntervalMs || + oldCfg.EnableAdaptiveInterval != newCfg.EnableAdaptiveInterval || + oldCfg.HeartbeatIntervalS != newCfg.HeartbeatIntervalS || + oldCfg.HeartbeatLevel != newCfg.HeartbeatLevel || + oldCfg.RetentionCheckMins != newCfg.RetentionCheckMins || + oldCfg.RetentionPeriodHrs != newCfg.RetentionPeriodHrs { + return true + } + + return false +} + // combineConfigErrors combines multiple configuration errors into a single error. func combineConfigErrors(errors []error) error { if len(errors) == 0 { diff --git a/go.mod b/go.mod index 19e9792..f9ea05c 100644 --- a/go.mod +++ b/go.mod @@ -4,24 +4,13 @@ go 1.24.5 require ( github.com/davecgh/go-spew v1.1.1 - github.com/lixenwraith/config v0.0.0-20250720060932-619500728e68 - github.com/panjf2000/gnet/v2 v2.9.1 github.com/stretchr/testify v1.10.0 - github.com/valyala/fasthttp v1.64.0 + ) require ( - github.com/BurntSushi/toml v1.5.0 // indirect - github.com/andybalholm/brotli v1.2.0 // indirect - github.com/klauspost/compress v1.18.0 // indirect - github.com/mitchellh/mapstructure v1.5.0 // indirect - github.com/panjf2000/ants/v2 v2.11.3 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/valyala/bytebufferpool v1.0.0 // indirect - go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect - golang.org/x/sync v0.16.0 // indirect - golang.org/x/sys v0.34.0 // indirect - gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/mitchellh/mapstructure => github.com/go-viper/mapstructure v1.6.0 diff --git a/go.sum b/go.sum index 6d0a4fb..44274b8 100644 --- a/go.sum +++ b/go.sum @@ -1,42 +1,10 @@ -github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg= -github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= -github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= -github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= -github.com/lixenwraith/config v0.0.0-20250720060932-619500728e68 h1:icxe+FleqQgope6Fum8xs/PBNApDZslFqjD65yUEsds= -github.com/lixenwraith/config v0.0.0-20250720060932-619500728e68/go.mod h1:F8ieHeZgOCPsoym5eynx4kjupfLXBpvJfnX1GzX++EA= -github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= -github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= -github.com/panjf2000/ants/v2 v2.11.3 h1:AfI0ngBoXJmYOpDh9m516vjqoUu2sLrIVgppI9TZVpg= -github.com/panjf2000/ants/v2 v2.11.3/go.mod h1:8u92CYMUc6gyvTIw8Ru7Mt7+/ESnJahz5EVtqfrilek= -github.com/panjf2000/gnet/v2 v2.9.1 h1:bKewICy/0xnQ9PMzNaswpe/Ah14w1TrRk91LHTcbIlA= -github.com/panjf2000/gnet/v2 v2.9.1/go.mod h1:WQTxDWYuQ/hz3eccH0FN32IVuvZ19HewEWx0l62fx7E= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= -github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.64.0 h1:QBygLLQmiAyiXuRhthf0tuRkqAFcrC42dckN2S+N3og= -github.com/valyala/fasthttp v1.64.0/go.mod h1:dGmFxwkWXSK0NbOSJuF7AMVzU+lkHz0wQVvVITv2UQA= -github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= -github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= -go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= -go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= -golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= -golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= -golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= -gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/heartbeat.go b/heartbeat.go index 5eaafb6..92c78a8 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -28,7 +28,6 @@ func (l *Logger) handleHeartbeat() { // logProcHeartbeat logs process/logger statistics heartbeat func (l *Logger) logProcHeartbeat() { processed := l.state.TotalLogsProcessed.Load() - dropped := l.state.DroppedLogs.Load() sequence := l.state.HeartbeatSequence.Add(1) startTimeVal := l.state.LoggerStartTime.Load() @@ -38,12 +37,25 @@ func (l *Logger) logProcHeartbeat() { uptimeHours = uptime.Hours() } + // Get total drops (persistent through logger instance lifecycle) + totalDropped := l.state.TotalDroppedLogs.Load() + + // Atomically get and reset interval drops + // NOTE: If PROC heartbeat fails, interval drops are lost and total count tracks such fails + // Design choice is not to parse the heartbeat log record and restore the count + droppedInInterval := l.state.DroppedLogs.Swap(0) + procArgs := []any{ "type", "proc", "sequence", sequence, "uptime_hours", fmt.Sprintf("%.2f", uptimeHours), "processed_logs", processed, - "dropped_logs", dropped, + "total_dropped_logs", totalDropped, + } + + // Add interval (since last proc heartbeat) drops if > 0 + if droppedInInterval > 0 { + procArgs = append(procArgs, "dropped_since_last", droppedInInterval) } l.writeHeartbeatRecord(LevelProc, procArgs) @@ -125,14 +137,12 @@ func (l *Logger) writeHeartbeatRecord(level int64, args []any) { // Create heartbeat record with appropriate flags record := logRecord{ - Flags: FlagDefault | FlagShowLevel, - TimeStamp: time.Now(), - Level: level, - Trace: "", - Args: args, - unreportedDrops: 0, + Flags: FlagDefault | FlagShowLevel, + TimeStamp: time.Now(), + Level: level, + Trace: "", + Args: args, } - // Send through the main processing channel l.sendLogRecord(record) } \ No newline at end of file diff --git a/integration_test.go b/integration_test.go index bbc9c1e..1b1d091 100644 --- a/integration_test.go +++ b/integration_test.go @@ -30,6 +30,10 @@ func TestFullLifecycle(t *testing.T) { require.NoError(t, err, "Logger creation with builder should succeed") require.NotNil(t, logger) + // Start the logger before use. + err = logger.Start() + require.NoError(t, err) + // Defer shutdown right after successful creation defer func() { err := logger.Shutdown(2 * time.Second) @@ -97,7 +101,7 @@ func TestConcurrentOperations(t *testing.T) { go func() { defer wg.Done() for i := 0; i < 3; i++ { - err := logger.ApplyConfigString(fmt.Sprintf("buffer_size=%d", 100+i*100)) + err := logger.ApplyConfigString(fmt.Sprintf("trace_depth=%d", i)) assert.NoError(t, err) time.Sleep(50 * time.Millisecond) } @@ -137,6 +141,9 @@ func TestErrorRecovery(t *testing.T) { err := logger.ApplyConfig(cfg) require.NoError(t, err) + // Small delay to ensure the processor has time to react if needed + time.Sleep(100 * time.Millisecond) + // Should detect disk space issue during the check isOK := logger.performDiskCheck(true) assert.False(t, isOK, "Disk check should fail when min free space is not met") @@ -145,14 +152,21 @@ func TestErrorRecovery(t *testing.T) { // Small delay to ensure the processor has time to react if needed time.Sleep(100 * time.Millisecond) - // Logs should be dropped when disk status is not OK preDropped := logger.state.DroppedLogs.Load() logger.Info("this log entry should be dropped") - // Small delay to let the log processor attempt to process the record - time.Sleep(100 * time.Millisecond) + var postDropped uint64 + var success bool + // Poll for up to 500ms for the async processor to update the state. + for i := 0; i < 50; i++ { + postDropped = logger.state.DroppedLogs.Load() + if postDropped > preDropped { + success = true + break + } + time.Sleep(10 * time.Millisecond) + } - postDropped := logger.state.DroppedLogs.Load() - assert.Greater(t, postDropped, preDropped, "Dropped log count should increase") + require.True(t, success, "Dropped log count should have increased after logging with disk full") }) } \ No newline at end of file diff --git a/lifecycle_test.go b/lifecycle_test.go new file mode 100644 index 0000000..551465c --- /dev/null +++ b/lifecycle_test.go @@ -0,0 +1,168 @@ +// FILE: lixenwraith/log/lifecycle_test.go +package log + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestStartStopLifecycle(t *testing.T) { + logger, _ := createTestLogger(t) // Starts the logger by default + + assert.True(t, logger.state.Started.Load(), "Logger should be in a started state") + + // Stop the logger + err := logger.Stop() + require.NoError(t, err) + assert.False(t, logger.state.Started.Load(), "Logger should be in a stopped state after Stop()") + + // Start it again + err = logger.Start() + require.NoError(t, err) + assert.True(t, logger.state.Started.Load(), "Logger should be in a started state after restart") + + logger.Shutdown() +} + +func TestStartAlreadyStarted(t *testing.T) { + logger, _ := createTestLogger(t) + defer logger.Shutdown() + + assert.True(t, logger.state.Started.Load()) + + // Calling Start() on an already started logger should be a no-op and return no error + err := logger.Start() + assert.NoError(t, err) + assert.True(t, logger.state.Started.Load()) +} + +func TestStopAlreadyStopped(t *testing.T) { + logger, _ := createTestLogger(t) + + // Stop it once + err := logger.Stop() + require.NoError(t, err) + assert.False(t, logger.state.Started.Load()) + + // Calling Stop() on an already stopped logger should be a no-op and return no error + err = logger.Stop() + assert.NoError(t, err) + assert.False(t, logger.state.Started.Load()) + + logger.Shutdown() +} + +func TestStopReconfigureRestart(t *testing.T) { + tmpDir := t.TempDir() + logger := NewLogger() + + // Initial config: txt format + cfg1 := DefaultConfig() + cfg1.Directory = tmpDir + cfg1.Format = "txt" + cfg1.ShowTimestamp = false + err := logger.ApplyConfig(cfg1) + require.NoError(t, err) + + // Start and log + err = logger.Start() + require.NoError(t, err) + logger.Info("first message") + logger.Flush(time.Second) + + // Stop the logger + err = logger.Stop() + require.NoError(t, err) + + // Reconfigure: json format + cfg2 := logger.GetConfig() + cfg2.Format = "json" + err = logger.ApplyConfig(cfg2) + require.NoError(t, err) + + // Restart and log + err = logger.Start() + require.NoError(t, err) + logger.Info("second message") + logger.Shutdown(time.Second) + + // Verify content + content, err := os.ReadFile(filepath.Join(tmpDir, "log.log")) + require.NoError(t, err) + strContent := string(content) + + assert.Contains(t, strContent, "INFO first message", "Should contain the log from the first configuration") + assert.Contains(t, strContent, `"fields":["second message"]`, "Should contain the log from the second (JSON) configuration") +} + +func TestLoggingOnStoppedLogger(t *testing.T) { + logger, tmpDir := createTestLogger(t) + + // Log something while running + logger.Info("this should be logged") + logger.Flush(time.Second) + + // Stop the logger + err := logger.Stop() + require.NoError(t, err) + + // Attempt to log while stopped + logger.Warn("this should NOT be logged") + + // Shutdown (which flushes) + logger.Shutdown(time.Second) + + content, err := os.ReadFile(filepath.Join(tmpDir, "log.log")) + require.NoError(t, err) + + assert.Contains(t, string(content), "this should be logged") + assert.NotContains(t, string(content), "this should NOT be logged") +} + +func TestFlushOnStoppedLogger(t *testing.T) { + logger, _ := createTestLogger(t) + + // Stop the logger + err := logger.Stop() + require.NoError(t, err) + + // Flush should return an error + err = logger.Flush(time.Second) + assert.Error(t, err) + assert.Contains(t, err.Error(), "logger not started") + + logger.Shutdown() +} + +func TestShutdownLifecycle(t *testing.T) { + logger, _ := createTestLogger(t) + + assert.True(t, logger.state.Started.Load()) + assert.True(t, logger.state.IsInitialized.Load()) + + // Shutdown is a terminal state + err := logger.Shutdown() + require.NoError(t, err) + + assert.True(t, logger.state.ShutdownCalled.Load()) + assert.False(t, logger.state.IsInitialized.Load(), "Shutdown should de-initialize the logger") + assert.False(t, logger.state.Started.Load(), "Shutdown should stop the logger") + + // Attempting to start again should fail because it's no longer initialized + err = logger.Start() + assert.Error(t, err) + assert.Contains(t, err.Error(), "logger not initialized") + + // Logging should be a silent no-op + logger.Info("this will not be logged") + + // Flush should fail + err = logger.Flush(time.Second) + assert.Error(t, err) + assert.Contains(t, err.Error(), "not initialized") +} \ No newline at end of file diff --git a/logger.go b/logger.go index 0cc171e..2525b63 100644 --- a/logger.go +++ b/logger.go @@ -102,10 +102,96 @@ func (l *Logger) GetConfig() *Config { return l.getConfig().Clone() } +// Start begins log processing. Safe to call multiple times. +// Returns error if logger is not initialized. +func (l *Logger) Start() error { + if !l.state.IsInitialized.Load() { + return fmtErrorf("logger not initialized, call ApplyConfig first") + } + + // Check if processor didn't exit cleanly last time + if l.state.Started.Load() && !l.state.ProcessorExited.Load() { + // Force stop to clean up + l.internalLog("warning - processor still running from previous start, forcing stop\n") + if err := l.Stop(); err != nil { + return fmtErrorf("failed to stop hung processor: %w", err) + } + } + + // Only start if not already started + if l.state.Started.CompareAndSwap(false, true) { + cfg := l.getConfig() + + // Create log channel + logChannel := make(chan logRecord, cfg.BufferSize) + l.state.ActiveLogChannel.Store(logChannel) + + // Start processor + l.state.ProcessorExited.Store(false) + go l.processLogs(logChannel) + + // Log startup if file output enabled + if !cfg.DisableFile { + startRecord := logRecord{ + Flags: FlagDefault, + TimeStamp: time.Now(), + Level: LevelInfo, + Args: []any{"Logger started"}, + } + l.sendLogRecord(startRecord) + } + } + + return nil +} + +// Stop halts log processing. Can be restarted with Start(). +// Returns nil if already stopped. +func (l *Logger) Stop(timeout ...time.Duration) error { + if !l.state.Started.CompareAndSwap(true, false) { + return nil // Already stopped + } + + // Calculate effective timeout + var effectiveTimeout time.Duration + if len(timeout) > 0 { + effectiveTimeout = timeout[0] + } else { + cfg := l.getConfig() + effectiveTimeout = 2 * time.Duration(cfg.FlushIntervalMs) * time.Millisecond + } + + // Get current channel and close it + ch := l.getCurrentLogChannel() + if ch != nil { + // Create closed channel for immediate replacement + closedChan := make(chan logRecord) + close(closedChan) + l.state.ActiveLogChannel.Store(closedChan) + + // Close the actual channel to signal processor + close(ch) + } + + // Wait for processor to exit (with timeout) + deadline := time.Now().Add(effectiveTimeout) + for time.Now().Before(deadline) { + if l.state.ProcessorExited.Load() { + break + } + time.Sleep(10 * time.Millisecond) + } + + if !l.state.ProcessorExited.Load() { + return fmtErrorf("processor did not exit within timeout (%v)", effectiveTimeout) + } + + return nil +} + // Shutdown gracefully closes the logger, attempting to flush pending records // If no timeout is provided, uses a default of 2x flush interval func (l *Logger) Shutdown(timeout ...time.Duration) error { - if !l.state.ShutdownCalled.CompareAndSwap(false, true) { return nil } @@ -119,35 +205,9 @@ func (l *Logger) Shutdown(timeout ...time.Duration) error { return nil } - l.initMu.Lock() - ch := l.getCurrentLogChannel() - closedChan := make(chan logRecord) - close(closedChan) - l.state.ActiveLogChannel.Store(closedChan) - if ch != closedChan { - close(ch) - } - l.initMu.Unlock() - - c := l.getConfig() - var effectiveTimeout time.Duration - if len(timeout) > 0 { - effectiveTimeout = timeout[0] - } else { - flushIntervalMs := c.FlushIntervalMs - // Default to 2x flush interval - effectiveTimeout = 2 * time.Duration(flushIntervalMs) * time.Millisecond - } - - deadline := time.Now().Add(effectiveTimeout) - pollInterval := minWaitTime // Reasonable check period - processorCleanlyExited := false - for time.Now().Before(deadline) { - if l.state.ProcessorExited.Load() { - processorCleanlyExited = true - break - } - time.Sleep(pollInterval) + var stopErr error + if l.state.Started.Load() { + stopErr = l.Stop(timeout...) } l.state.IsInitialized.Store(false) @@ -168,9 +228,8 @@ func (l *Logger) Shutdown(timeout ...time.Duration) error { } } - if !processorCleanlyExited { - timeoutErr := fmtErrorf("logger processor did not exit within timeout (%v)", effectiveTimeout) - finalErr = combineErrors(finalErr, timeoutErr) + if stopErr != nil { + finalErr = combineErrors(finalErr, stopErr) } return finalErr @@ -181,9 +240,13 @@ func (l *Logger) Flush(timeout time.Duration) error { l.state.flushMutex.Lock() defer l.state.flushMutex.Unlock() + // State checks if !l.state.IsInitialized.Load() || l.state.ShutdownCalled.Load() { return fmtErrorf("logger not initialized or already shut down") } + if !l.state.Started.Load() { + return fmtErrorf("logger not started") + } // Create a channel to wait for confirmation from the processor confirmChan := make(chan struct{}) @@ -304,6 +367,18 @@ func (l *Logger) applyConfig(cfg *Config) error { // Get current state wasInitialized := l.state.IsInitialized.Load() + wasStarted := l.state.Started.Load() + + // Determine if restart is needed + needsRestart := wasStarted && wasInitialized && configRequiresRestart(oldCfg, cfg) + + // Stop processor if restart needed + if needsRestart { + if err := l.Stop(); err != nil { + l.currentConfig.Store(oldCfg) // Rollback + return fmtErrorf("failed to stop processor for restart: %w", err) + } + } // Get current file handle currentFilePtr := l.state.CurrentFile.Load() @@ -313,7 +388,10 @@ func (l *Logger) applyConfig(cfg *Config) error { } // Determine if we need a new file - needsNewFile := !wasInitialized || currentFile == nil + needsNewFile := !wasInitialized || currentFile == nil || + oldCfg.Directory != cfg.Directory || + oldCfg.Name != cfg.Name || + oldCfg.Extension != cfg.Extension // Handle file state transitions if cfg.DisableFile { @@ -351,27 +429,6 @@ func (l *Logger) applyConfig(cfg *Config) error { } } - // Close the old channel if reconfiguring - if wasInitialized { - oldCh := l.getCurrentLogChannel() - if oldCh != nil { - // Create new channel then close old channel - newLogChannel := make(chan logRecord, cfg.BufferSize) - l.state.ActiveLogChannel.Store(newLogChannel) - close(oldCh) - - // Start new processor with new channel - l.state.ProcessorExited.Store(false) - go l.processLogs(newLogChannel) - } - } else { - // Initial startup - newLogChannel := make(chan logRecord, cfg.BufferSize) - l.state.ActiveLogChannel.Store(newLogChannel) - l.state.ProcessorExited.Store(false) - go l.processLogs(newLogChannel) - } - // Setup stdout writer based on config if cfg.EnableStdout { var writer io.Writer @@ -388,8 +445,13 @@ func (l *Logger) applyConfig(cfg *Config) error { // Mark as initialized l.state.IsInitialized.Store(true) l.state.ShutdownCalled.Store(false) - l.state.DiskFullLogged.Store(false) - l.state.DiskStatusOK.Store(true) + // l.state.DiskFullLogged.Store(false) + // l.state.DiskStatusOK.Store(true) + + // Restart processor if it was running and needs restart + if needsRestart { + return l.Start() + } return nil } \ No newline at end of file diff --git a/logger_test.go b/logger_test.go index a6f1ee9..5dcbf6c 100644 --- a/logger_test.go +++ b/logger_test.go @@ -26,6 +26,10 @@ func createTestLogger(t *testing.T) (*Logger, string) { err := logger.ApplyConfig(cfg) require.NoError(t, err) + // Start the logger, which is the new requirement. + err = logger.Start() + require.NoError(t, err) + return logger, tmpDir } @@ -46,6 +50,7 @@ func TestApplyConfig(t *testing.T) { assert.True(t, logger.state.IsInitialized.Load()) // Verify log file creation + // The file now contains "Logger started" logPath := filepath.Join(tmpDir, "log.log") _, err := os.Stat(logPath) assert.NoError(t, err) @@ -190,7 +195,9 @@ func TestLoggerFormats(t *testing.T) { name: "raw format", format: "raw", check: func(t *testing.T, content string) { - assert.Equal(t, "test message", strings.TrimSpace(content)) + // The "Logger started" message is also written in raw format. + // We just check that our test message is present in the output. + assert.Contains(t, content, "test message") }, }, } @@ -211,21 +218,19 @@ func TestLoggerFormats(t *testing.T) { err := logger.ApplyConfig(cfg) require.NoError(t, err) - // Small delay for reconfiguragion - time.Sleep(100 * time.Millisecond) + // Start the logger after configuring it. + err = logger.Start() + require.NoError(t, err) defer logger.Shutdown() logger.Info("test message") - // Small delay for log to be processed - time.Sleep(100 * time.Millisecond) - err = logger.Flush(time.Second) require.NoError(t, err) // Small delay for flush - time.Sleep(100 * time.Millisecond) + time.Sleep(50 * time.Millisecond) content, err := os.ReadFile(filepath.Join(tmpDir, "log.log")) require.NoError(t, err) @@ -265,6 +270,8 @@ func TestLoggerStdoutMirroring(t *testing.T) { err := logger.ApplyConfig(cfg) require.NoError(t, err) + err = logger.Start() + require.NoError(t, err) defer logger.Shutdown() // Just verify it doesn't panic - actual stdout capture is complex @@ -277,16 +284,17 @@ func TestLoggerWrite(t *testing.T) { logger.Write("raw", "output", 123) - // Small delay for log process - time.Sleep(100 * time.Millisecond) - logger.Flush(time.Second) // Small delay for flush - time.Sleep(100 * time.Millisecond) + time.Sleep(50 * time.Millisecond) content, err := os.ReadFile(filepath.Join(tmpDir, "log.log")) require.NoError(t, err) - assert.Equal(t, "raw output 123", string(content)) + // The file will contain the "Logger started" message first. + // We check that our raw output is also present. + // Since raw output doesn't add a newline, the file should end with our string. + assert.Contains(t, string(content), "raw output 123") + assert.True(t, strings.HasSuffix(string(content), "raw output 123")) } \ No newline at end of file diff --git a/processor.go b/processor.go index 0184756..54d4de6 100644 --- a/processor.go +++ b/processor.go @@ -94,10 +94,11 @@ func (l *Logger) processLogs(ch <-chan logRecord) { // processLogRecord handles individual log records, returning bytes written func (l *Logger) processLogRecord(record logRecord) int64 { c := l.getConfig() - // Check if the record should process this record disableFile := c.DisableFile if !disableFile && !l.state.DiskStatusOK.Load() { + // Simple increment of both counters l.state.DroppedLogs.Add(1) + l.state.TotalDroppedLogs.Add(1) return 0 } diff --git a/processor_test.go b/processor_test.go index 5581a0f..71ccee5 100644 --- a/processor_test.go +++ b/processor_test.go @@ -2,8 +2,10 @@ package log import ( + "encoding/json" "os" "path/filepath" + "strings" "testing" "time" @@ -18,7 +20,8 @@ func TestLoggerHeartbeat(t *testing.T) { cfg := logger.GetConfig() cfg.HeartbeatLevel = 3 // All heartbeats cfg.HeartbeatIntervalS = 1 - logger.ApplyConfig(cfg) + err := logger.ApplyConfig(cfg) + require.NoError(t, err) // Wait for heartbeats time.Sleep(1500 * time.Millisecond) @@ -42,24 +45,55 @@ func TestDroppedLogs(t *testing.T) { cfg := DefaultConfig() cfg.Directory = t.TempDir() cfg.BufferSize = 1 // Very small buffer - cfg.FlushIntervalMs = 1000 // Slow flush + cfg.FlushIntervalMs = 10 // Fast processing + cfg.HeartbeatLevel = 1 // Enable proc heartbeat + cfg.HeartbeatIntervalS = 1 // Fast heartbeat err := logger.ApplyConfig(cfg) require.NoError(t, err) + + err = logger.Start() + require.NoError(t, err) defer logger.Shutdown() - // Flood the logger + // Flood to guarantee drops for i := 0; i < 100; i++ { logger.Info("flood", i) } - // Let it process - time.Sleep(100 * time.Millisecond) + // Wait for first heartbeat + time.Sleep(1500 * time.Millisecond) - // Check drop counter - dropped := logger.state.DroppedLogs.Load() - // Some logs should have been dropped with buffer size 1 - assert.Greater(t, dropped, uint64(0)) + // Flood again + for i := 0; i < 50; i++ { + logger.Info("flood2", i) + } + + // Wait for second heartbeat + time.Sleep(1000 * time.Millisecond) + logger.Flush(time.Second) + + // Read log file and verify heartbeats + content, err := os.ReadFile(filepath.Join(cfg.Directory, "log.log")) + require.NoError(t, err) + + lines := strings.Split(string(content), "\n") + foundTotal := false + foundInterval := false + + for _, line := range lines { + if strings.Contains(line, "PROC") { + if strings.Contains(line, "total_dropped_logs") { + foundTotal = true + } + if strings.Contains(line, "dropped_since_last") { + foundInterval = true + } + } + } + + assert.True(t, foundTotal, "Expected PROC heartbeat with total_dropped_logs") + assert.True(t, foundInterval, "Expected PROC heartbeat with dropped_since_last") } func TestAdaptiveDiskCheck(t *testing.T) { @@ -71,7 +105,8 @@ func TestAdaptiveDiskCheck(t *testing.T) { cfg.DiskCheckIntervalMs = 100 cfg.MinCheckIntervalMs = 50 cfg.MaxCheckIntervalMs = 500 - logger.ApplyConfig(cfg) + err := logger.ApplyConfig(cfg) + require.NoError(t, err) // Generate varying log rates and verify no panic for i := 0; i < 10; i++ { @@ -85,4 +120,93 @@ func TestAdaptiveDiskCheck(t *testing.T) { } logger.Flush(time.Second) +} + +func TestDroppedLogRecoveryOnDroppedHeartbeat(t *testing.T) { + logger := NewLogger() + + cfg := DefaultConfig() + cfg.Directory = t.TempDir() + cfg.BufferSize = 10 // Small buffer + cfg.HeartbeatLevel = 1 // Enable proc heartbeat + cfg.HeartbeatIntervalS = 1 // Fast heartbeat + cfg.Format = "json" // Use JSON for easy parsing + + err := logger.ApplyConfig(cfg) + require.NoError(t, err) + + err = logger.Start() + require.NoError(t, err) + defer logger.Shutdown() + + // 1. Flood the logger to guarantee drops. Let's aim to drop exactly 50 logs. + const floodCount = 50 + for i := 0; i < int(cfg.BufferSize)+floodCount; i++ { + logger.Info("flood", i) + } + + // Wait for the first heartbeat to be generated. It will carry the count of ~50 drops. + time.Sleep(1100 * time.Millisecond) + + // 2. Immediately put the logger into a "disk full" state. + // This will cause the processor to drop the first heartbeat record. + diskFullCfg := logger.GetConfig() + diskFullCfg.MinDiskFreeKB = 9999999999 + err = logger.ApplyConfig(diskFullCfg) + require.NoError(t, err) + // Force a disk check to ensure the state is updated to not OK. + logger.performDiskCheck(true) + assert.False(t, logger.state.DiskStatusOK.Load(), "Disk status should be not OK") + + // 3. Now, "fix" the disk so the next heartbeat can be written successfully. + diskOKCfg := logger.GetConfig() + diskOKCfg.MinDiskFreeKB = 0 + err = logger.ApplyConfig(diskOKCfg) + require.NoError(t, err) + logger.performDiskCheck(true) // Ensure state is updated back to OK. + assert.True(t, logger.state.DiskStatusOK.Load(), "Disk status should be OK") + + // 4. Wait for the second heartbeat to be generated and written to the file. + time.Sleep(1100 * time.Millisecond) + logger.Flush(time.Second) + + // 5. Verify the log file content. + content, err := os.ReadFile(filepath.Join(cfg.Directory, "log.log")) + require.NoError(t, err) + + var foundHeartbeat bool + var intervalDropCount, totalDropCount float64 + lines := strings.Split(string(content), "\n") + + for _, line := range lines { + // Find the last valid heartbeat with drop stats. + if strings.Contains(line, `"level":"PROC"`) && strings.Contains(line, "dropped_since_last") { + foundHeartbeat = true + var entry map[string]interface{} + err := json.Unmarshal([]byte(line), &entry) + require.NoError(t, err, "Failed to parse heartbeat log line: %s", line) + + fields := entry["fields"].([]interface{}) + for i := 0; i < len(fields)-1; i += 2 { + if key, ok := fields[i].(string); ok { + if key == "dropped_since_last" { + intervalDropCount, _ = fields[i+1].(float64) + } + if key == "total_dropped_logs" { + totalDropCount, _ = fields[i+1].(float64) + } + } + } + } + } + + require.True(t, foundHeartbeat, "Did not find the final heartbeat with drop stats") + + // ASSERT THE CURRENT BEHAVIOR: + // The 'dropped_since_last' count from the first heartbeat (~50) was lost when that heartbeat was dropped. + // The only new drop in the next interval was the heartbeat record itself. + assert.Equal(t, float64(1), intervalDropCount, "The interval drop count should only reflect the single dropped heartbeat from the previous interval.") + + // The 'total_dropped_logs' counter should be accurate, reflecting the initial flood (~50) + the one dropped heartbeat. + assert.True(t, totalDropCount >= float64(floodCount), "Total drop count should be at least the number of flooded logs plus the dropped heartbeat.") } \ No newline at end of file diff --git a/record.go b/record.go index 1836a14..7aa8d6f 100644 --- a/record.go +++ b/record.go @@ -32,13 +32,15 @@ func (l *Logger) getFlags() int64 { func (l *Logger) sendLogRecord(record logRecord) { defer func() { if r := recover(); r != nil { // Catch panic on send to closed channel - l.handleFailedSend(record) + l.handleFailedSend() } }() - if l.state.ShutdownCalled.Load() || l.state.LoggerDisabled.Load() { + if l.state.ShutdownCalled.Load() || + l.state.LoggerDisabled.Load() || + !l.state.Started.Load() { // Process drops even if logger is disabled or shutting down - l.handleFailedSend(record) + l.handleFailedSend() return } @@ -47,51 +49,42 @@ func (l *Logger) sendLogRecord(record logRecord) { // Non-blocking send select { case ch <- record: - // Success: record sent, channel was not full, check if log drops need to be reported - if record.unreportedDrops == 0 { - // Get number of dropped logs and reset the counter to zero - droppedCount := l.state.DroppedLogs.Swap(0) - - if droppedCount > 0 { - // Dropped logs report - dropRecord := logRecord{ - Flags: FlagDefault, - TimeStamp: time.Now(), - Level: LevelError, - Args: []any{"Logs were dropped", "dropped_count", droppedCount}, - unreportedDrops: droppedCount, // Carry the count for recovery - } - // No success check is required, count is restored if it fails - l.sendLogRecord(dropRecord) - } - } + // Success default: - l.handleFailedSend(record) + l.handleFailedSend() } } -// handleFailedSend restores or increments drop counter -func (l *Logger) handleFailedSend(record logRecord) { - // For regular record, add 1 to dropped log count - // For drop report, restore the count - amountToAdd := uint64(1) - if record.unreportedDrops > 0 { - amountToAdd = record.unreportedDrops - } - l.state.DroppedLogs.Add(amountToAdd) +// handleFailedSend increments drop counters +func (l *Logger) handleFailedSend() { + l.state.DroppedLogs.Add(1) // Interval counter + l.state.TotalDroppedLogs.Add(1) // Total counter } // log handles the core logging logic func (l *Logger) log(flags int64, level int64, depth int64, args ...any) { + // State checks if !l.state.IsInitialized.Load() { return } + if !l.state.Started.Load() { + // Log to internal error channel if configured + cfg := l.getConfig() + if cfg.InternalErrorsToStderr { + l.internalLog("warning - logger not started, dropping log entry\n") + } + return + } + + // Discard or proceed based on level cfg := l.getConfig() if level < cfg.Level { return } + // Get trace info from runtime + // Depth filter hard-coded based on call stack of current package design var trace string if depth > 0 { const skipTrace = 3 // log.Info -> log -> getTrace (Adjust if call stack changes) @@ -99,12 +92,11 @@ func (l *Logger) log(flags int64, level int64, depth int64, args ...any) { } record := logRecord{ - Flags: flags, - TimeStamp: time.Now(), - Level: level, - Trace: trace, - Args: args, - unreportedDrops: 0, // 0 for regular logs + Flags: flags, + TimeStamp: time.Now(), + Level: level, + Trace: trace, + Args: args, } l.sendLogRecord(record) } diff --git a/state.go b/state.go index 006fb0f..7339266 100644 --- a/state.go +++ b/state.go @@ -9,11 +9,12 @@ import ( // State encapsulates the runtime state of the logger type State struct { // General state - IsInitialized atomic.Bool - LoggerDisabled atomic.Bool + IsInitialized atomic.Bool // Tracks successful initialization, not start of log processor + LoggerDisabled atomic.Bool // Tracks logger stop due to issues (e.g. disk full) ShutdownCalled atomic.Bool DiskFullLogged atomic.Bool DiskStatusOK atomic.Bool + Started atomic.Bool // Tracks calls to Start() and Stop() ProcessorExited atomic.Bool // Tracks if the processor goroutine is running or has exited // Flushing state @@ -30,7 +31,8 @@ type State struct { // Log state ActiveLogChannel atomic.Value // stores chan logRecord - DroppedLogs atomic.Uint64 // Counter for logs dropped + DroppedLogs atomic.Uint64 // Counter for logs dropped since last heartbeat + TotalDroppedLogs atomic.Uint64 // Counter for total logs dropped since logger start // Heartbeat statistics HeartbeatSequence atomic.Uint64 // Counter for heartbeat sequence numbers diff --git a/storage.go b/storage.go index 8821b77..a7df17c 100644 --- a/storage.go +++ b/storage.go @@ -70,9 +70,7 @@ func (l *Logger) performDiskCheck(forceCleanup bool) bool { freeSpace, err := l.getDiskFreeSpace(dir) if err != nil { l.internalLog("warning - failed to check free disk space for '%s': %v\n", dir, err) - if l.state.DiskStatusOK.Load() { - l.state.DiskStatusOK.Store(false) - } + l.state.DiskStatusOK.Store(false) return false } @@ -110,9 +108,7 @@ func (l *Logger) performDiskCheck(forceCleanup bool) bool { } l.sendLogRecord(diskFullRecord) } - if l.state.DiskStatusOK.Load() { - l.state.DiskStatusOK.Store(false) - } + l.state.DiskStatusOK.Store(false) return false } // Cleanup succeeded diff --git a/type.go b/type.go index 6169b98..edfb19d 100644 --- a/type.go +++ b/type.go @@ -8,12 +8,11 @@ import ( // logRecord represents a single log entry. type logRecord struct { - Flags int64 - TimeStamp time.Time - Level int64 - Trace string - Args []any - unreportedDrops uint64 // Dropped log tracker + Flags int64 + TimeStamp time.Time + Level int64 + Trace string + Args []any } // TimerSet holds all timers used in processLogs