v0.3.9 config auto-reload added
This commit is contained in:
@ -20,6 +20,7 @@ Application Control:
|
||||
|
||||
Runtime Behavior:
|
||||
--disable-status-reporter Disable the periodic status reporter.
|
||||
--config-auto-reload Enable config reload and pipeline reconfiguration on config file change.
|
||||
|
||||
Configuration Sources (Precedence: CLI > Env > File > Defaults):
|
||||
- CLI flags override all other settings.
|
||||
|
||||
@ -84,22 +84,54 @@ func main() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Setup signal handling
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
|
||||
// Service and hot reload management
|
||||
var reloadManager *ReloadManager
|
||||
|
||||
if cfg.ConfigAutoReload && cfg.ConfigFile != "" {
|
||||
// Use reload manager for dynamic configuration
|
||||
logger.Info("msg", "Config auto-reload enabled",
|
||||
"config_file", cfg.ConfigFile)
|
||||
|
||||
reloadManager = NewReloadManager(cfg.ConfigFile, cfg, logger)
|
||||
|
||||
if err := reloadManager.Start(ctx); err != nil {
|
||||
logger.Error("msg", "Failed to start reload manager", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer reloadManager.Shutdown()
|
||||
|
||||
// Setup signal handler with reload support
|
||||
signalHandler := NewSignalHandler(reloadManager, logger)
|
||||
defer signalHandler.Stop()
|
||||
|
||||
// Handle signals in background
|
||||
go func() {
|
||||
sig := signalHandler.Handle(ctx)
|
||||
if sig != nil {
|
||||
logger.Info("msg", "Shutdown signal received",
|
||||
"signal", sig)
|
||||
cancel() // Trigger shutdown
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
// Traditional static bootstrap
|
||||
logger.Info("msg", "Config auto-reload disabled")
|
||||
|
||||
// Bootstrap the service
|
||||
svc, router, err := bootstrapService(ctx, cfg)
|
||||
if err != nil {
|
||||
logger.Error("msg", "Failed to bootstrap service", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Start status reporter if enabled
|
||||
// Start status reporter if enabled (static mode)
|
||||
if !cfg.DisableStatusReporter {
|
||||
go statusReporter(svc)
|
||||
go statusReporter(svc, ctx)
|
||||
}
|
||||
|
||||
// Setup traditional signal handling
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
|
||||
|
||||
// Wait for shutdown signal
|
||||
sig := <-sigChan
|
||||
|
||||
@ -133,6 +165,15 @@ func main() {
|
||||
logger.Error("msg", "Shutdown timeout exceeded - forcing exit")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
return // Exit from static mode
|
||||
}
|
||||
|
||||
// Wait for context cancellation
|
||||
<-ctx.Done()
|
||||
|
||||
// Shutdown is handled by ReloadManager.Shutdown() in defer
|
||||
logger.Info("msg", "Shutdown complete")
|
||||
}
|
||||
|
||||
func shutdownLogger() {
|
||||
|
||||
354
src/cmd/logwisp/reload.go
Normal file
354
src/cmd/logwisp/reload.go
Normal file
@ -0,0 +1,354 @@
|
||||
// FILE: src/cmd/logwisp/reload.go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"logwisp/src/internal/config"
|
||||
"logwisp/src/internal/service"
|
||||
|
||||
lconfig "github.com/lixenwraith/config"
|
||||
"github.com/lixenwraith/log"
|
||||
)
|
||||
|
||||
// ReloadManager handles configuration hot reload
|
||||
type ReloadManager struct {
|
||||
configPath string
|
||||
service *service.Service
|
||||
router *service.HTTPRouter
|
||||
cfg *config.Config
|
||||
lcfg *lconfig.Config // TODO: use the same cfg struct
|
||||
logger *log.Logger
|
||||
mu sync.RWMutex
|
||||
reloadingMu sync.Mutex
|
||||
isReloading bool
|
||||
shutdownCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
|
||||
// Status reporter management
|
||||
statusReporterCancel context.CancelFunc
|
||||
statusReporterMu sync.Mutex
|
||||
}
|
||||
|
||||
// NewReloadManager creates a new reload manager
|
||||
func NewReloadManager(configPath string, initialCfg *config.Config, logger *log.Logger) *ReloadManager {
|
||||
return &ReloadManager{
|
||||
configPath: configPath,
|
||||
cfg: initialCfg,
|
||||
logger: logger,
|
||||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins watching for configuration changes
|
||||
func (rm *ReloadManager) Start(ctx context.Context) error {
|
||||
// Bootstrap initial service
|
||||
svc, router, err := bootstrapService(ctx, rm.cfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to bootstrap initial service: %w", err)
|
||||
}
|
||||
|
||||
rm.mu.Lock()
|
||||
rm.service = svc
|
||||
rm.router = router
|
||||
rm.mu.Unlock()
|
||||
|
||||
// Start status reporter for initial service
|
||||
if !rm.cfg.DisableStatusReporter {
|
||||
rm.startStatusReporter(ctx, svc)
|
||||
}
|
||||
|
||||
// Create lconfig instance for file watching
|
||||
lcfg, err := lconfig.NewBuilder().
|
||||
WithFile(rm.configPath).
|
||||
WithTarget(rm.cfg).
|
||||
Build()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create config watcher: %w", err)
|
||||
}
|
||||
|
||||
rm.lcfg = lcfg
|
||||
|
||||
// Enable auto-update with custom options
|
||||
watchOpts := lconfig.WatchOptions{
|
||||
PollInterval: time.Second,
|
||||
Debounce: 500 * time.Millisecond,
|
||||
ReloadTimeout: 30 * time.Second,
|
||||
VerifyPermissions: true, // SECURITY: Prevent malicious config replacement
|
||||
}
|
||||
lcfg.AutoUpdateWithOptions(watchOpts)
|
||||
|
||||
// Start watching for changes
|
||||
rm.wg.Add(1)
|
||||
go rm.watchLoop(ctx)
|
||||
|
||||
rm.logger.Info("msg", "Configuration hot reload enabled",
|
||||
"config_file", rm.configPath)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// watchLoop monitors configuration changes
|
||||
func (rm *ReloadManager) watchLoop(ctx context.Context) {
|
||||
defer rm.wg.Done()
|
||||
|
||||
changeCh := rm.lcfg.Watch()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-rm.shutdownCh:
|
||||
return
|
||||
case changedPath := <-changeCh:
|
||||
// Handle special notifications
|
||||
switch changedPath {
|
||||
case "file_deleted":
|
||||
rm.logger.Error("msg", "Configuration file deleted",
|
||||
"action", "keeping current configuration")
|
||||
continue
|
||||
case "permissions_changed":
|
||||
// SECURITY: Config file permissions changed suspiciously
|
||||
rm.logger.Error("msg", "Configuration file permissions changed",
|
||||
"action", "reload blocked for security")
|
||||
continue
|
||||
case "reload_timeout":
|
||||
rm.logger.Error("msg", "Configuration reload timed out",
|
||||
"action", "keeping current configuration")
|
||||
continue
|
||||
default:
|
||||
if strings.HasPrefix(changedPath, "reload_error:") {
|
||||
rm.logger.Error("msg", "Configuration reload error",
|
||||
"error", strings.TrimPrefix(changedPath, "reload_error:"),
|
||||
"action", "keeping current configuration")
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Trigger reload for any pipeline-related change
|
||||
if rm.shouldReload(changedPath) {
|
||||
rm.triggerReload(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// shouldReload determines if a config change requires service reload
|
||||
func (rm *ReloadManager) shouldReload(path string) bool {
|
||||
// Pipeline changes always require reload
|
||||
if strings.HasPrefix(path, "pipelines.") || path == "pipelines" {
|
||||
return true
|
||||
}
|
||||
|
||||
// Router mode changes require reload
|
||||
if path == "router" || path == "use_router" {
|
||||
return true
|
||||
}
|
||||
|
||||
// Logging changes don't require service reload
|
||||
if strings.HasPrefix(path, "logging.") {
|
||||
return false
|
||||
}
|
||||
|
||||
// Status reporter changes
|
||||
if path == "disable_status_reporter" {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// triggerReload performs the actual reload
|
||||
func (rm *ReloadManager) triggerReload(ctx context.Context) {
|
||||
// Prevent concurrent reloads
|
||||
rm.reloadingMu.Lock()
|
||||
if rm.isReloading {
|
||||
rm.reloadingMu.Unlock()
|
||||
rm.logger.Debug("msg", "Reload already in progress, skipping")
|
||||
return
|
||||
}
|
||||
rm.isReloading = true
|
||||
rm.reloadingMu.Unlock()
|
||||
|
||||
defer func() {
|
||||
rm.reloadingMu.Lock()
|
||||
rm.isReloading = false
|
||||
rm.reloadingMu.Unlock()
|
||||
}()
|
||||
|
||||
rm.logger.Info("msg", "Starting configuration hot reload")
|
||||
|
||||
// Create reload context with timeout
|
||||
reloadCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := rm.performReload(reloadCtx); err != nil {
|
||||
rm.logger.Error("msg", "Hot reload failed",
|
||||
"error", err,
|
||||
"action", "keeping current configuration and services")
|
||||
return
|
||||
}
|
||||
|
||||
rm.logger.Info("msg", "Configuration hot reload completed successfully")
|
||||
}
|
||||
|
||||
// performReload executes the reload process
|
||||
func (rm *ReloadManager) performReload(ctx context.Context) error {
|
||||
// Get updated config from lconfig
|
||||
updatedCfg, err := rm.lcfg.AsStruct()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get updated config: %w", err)
|
||||
}
|
||||
|
||||
newCfg := updatedCfg.(*config.Config)
|
||||
|
||||
// Get current service snapshot
|
||||
rm.mu.RLock()
|
||||
oldService := rm.service
|
||||
oldRouter := rm.router
|
||||
rm.mu.RUnlock()
|
||||
|
||||
// Try to bootstrap with new configuration
|
||||
rm.logger.Debug("msg", "Bootstrapping new service with updated config")
|
||||
newService, newRouter, err := bootstrapService(ctx, newCfg)
|
||||
if err != nil {
|
||||
// Bootstrap failed - keep old services running
|
||||
return fmt.Errorf("failed to bootstrap new service (old service still active): %w", err)
|
||||
}
|
||||
|
||||
// Bootstrap succeeded - swap services atomically
|
||||
rm.mu.Lock()
|
||||
rm.service = newService
|
||||
rm.router = newRouter
|
||||
rm.cfg = newCfg
|
||||
rm.mu.Unlock()
|
||||
|
||||
// Stop old status reporter and start new one
|
||||
rm.restartStatusReporter(ctx, newService)
|
||||
|
||||
// Gracefully shutdown old services
|
||||
// This happens after the swap to minimize downtime
|
||||
go rm.shutdownOldServices(oldRouter, oldService)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// shutdownOldServices gracefully shuts down old services
|
||||
func (rm *ReloadManager) shutdownOldServices(router *service.HTTPRouter, svc *service.Service) {
|
||||
// Give connections time to drain
|
||||
rm.logger.Debug("msg", "Draining connections from old services")
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
if router != nil {
|
||||
rm.logger.Info("msg", "Shutting down old router")
|
||||
router.Shutdown()
|
||||
}
|
||||
|
||||
if svc != nil {
|
||||
rm.logger.Info("msg", "Shutting down old service")
|
||||
svc.Shutdown()
|
||||
}
|
||||
|
||||
rm.logger.Debug("msg", "Old services shutdown complete")
|
||||
}
|
||||
|
||||
// startStatusReporter starts a new status reporter
|
||||
func (rm *ReloadManager) startStatusReporter(ctx context.Context, svc *service.Service) {
|
||||
rm.statusReporterMu.Lock()
|
||||
defer rm.statusReporterMu.Unlock()
|
||||
|
||||
// Create cancellable context for status reporter
|
||||
reporterCtx, cancel := context.WithCancel(ctx)
|
||||
rm.statusReporterCancel = cancel
|
||||
|
||||
go statusReporter(svc, reporterCtx)
|
||||
rm.logger.Debug("msg", "Started status reporter")
|
||||
}
|
||||
|
||||
// restartStatusReporter stops old and starts new status reporter
|
||||
func (rm *ReloadManager) restartStatusReporter(ctx context.Context, newService *service.Service) {
|
||||
if rm.cfg.DisableStatusReporter {
|
||||
// Just stop the old one if disabled
|
||||
rm.stopStatusReporter()
|
||||
return
|
||||
}
|
||||
|
||||
rm.statusReporterMu.Lock()
|
||||
defer rm.statusReporterMu.Unlock()
|
||||
|
||||
// Stop old reporter
|
||||
if rm.statusReporterCancel != nil {
|
||||
rm.statusReporterCancel()
|
||||
rm.logger.Debug("msg", "Stopped old status reporter")
|
||||
}
|
||||
|
||||
// Start new reporter
|
||||
reporterCtx, cancel := context.WithCancel(ctx)
|
||||
rm.statusReporterCancel = cancel
|
||||
|
||||
go statusReporter(newService, reporterCtx)
|
||||
rm.logger.Debug("msg", "Started new status reporter")
|
||||
}
|
||||
|
||||
// stopStatusReporter stops the status reporter
|
||||
func (rm *ReloadManager) stopStatusReporter() {
|
||||
rm.statusReporterMu.Lock()
|
||||
defer rm.statusReporterMu.Unlock()
|
||||
|
||||
if rm.statusReporterCancel != nil {
|
||||
rm.statusReporterCancel()
|
||||
rm.statusReporterCancel = nil
|
||||
rm.logger.Debug("msg", "Stopped status reporter")
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown stops the reload manager
|
||||
func (rm *ReloadManager) Shutdown() {
|
||||
rm.logger.Info("msg", "Shutting down reload manager")
|
||||
|
||||
// Stop status reporter
|
||||
rm.stopStatusReporter()
|
||||
|
||||
// Stop watching
|
||||
close(rm.shutdownCh)
|
||||
rm.wg.Wait()
|
||||
|
||||
// Stop config watching
|
||||
if rm.lcfg != nil {
|
||||
rm.lcfg.StopAutoUpdate()
|
||||
}
|
||||
|
||||
// Shutdown current services
|
||||
rm.mu.RLock()
|
||||
currentRouter := rm.router
|
||||
currentService := rm.service
|
||||
rm.mu.RUnlock()
|
||||
|
||||
if currentRouter != nil {
|
||||
rm.logger.Info("msg", "Shutting down router")
|
||||
currentRouter.Shutdown()
|
||||
}
|
||||
|
||||
if currentService != nil {
|
||||
rm.logger.Info("msg", "Shutting down service")
|
||||
currentService.Shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
// GetService returns the current service (thread-safe)
|
||||
func (rm *ReloadManager) GetService() *service.Service {
|
||||
rm.mu.RLock()
|
||||
defer rm.mu.RUnlock()
|
||||
return rm.service
|
||||
}
|
||||
|
||||
// GetRouter returns the current router (thread-safe)
|
||||
func (rm *ReloadManager) GetRouter() *service.HTTPRouter {
|
||||
rm.mu.RLock()
|
||||
defer rm.mu.RUnlock()
|
||||
return rm.router
|
||||
}
|
||||
65
src/cmd/logwisp/signal.go
Normal file
65
src/cmd/logwisp/signal.go
Normal file
@ -0,0 +1,65 @@
|
||||
// FILE: src/cmd/logwisp/signals.go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/lixenwraith/log"
|
||||
)
|
||||
|
||||
// SignalHandler manages OS signals
|
||||
type SignalHandler struct {
|
||||
reloadManager *ReloadManager
|
||||
logger *log.Logger
|
||||
sigChan chan os.Signal
|
||||
}
|
||||
|
||||
// NewSignalHandler creates a signal handler
|
||||
func NewSignalHandler(rm *ReloadManager, logger *log.Logger) *SignalHandler {
|
||||
sh := &SignalHandler{
|
||||
reloadManager: rm,
|
||||
logger: logger,
|
||||
sigChan: make(chan os.Signal, 1),
|
||||
}
|
||||
|
||||
// Register for signals
|
||||
signal.Notify(sh.sigChan,
|
||||
syscall.SIGINT,
|
||||
syscall.SIGTERM,
|
||||
syscall.SIGHUP, // Traditional reload signal
|
||||
syscall.SIGUSR1, // Alternative reload signal
|
||||
)
|
||||
|
||||
return sh
|
||||
}
|
||||
|
||||
// Handle processes signals
|
||||
func (sh *SignalHandler) Handle(ctx context.Context) os.Signal {
|
||||
for {
|
||||
select {
|
||||
case sig := <-sh.sigChan:
|
||||
switch sig {
|
||||
case syscall.SIGHUP, syscall.SIGUSR1:
|
||||
sh.logger.Info("msg", "Reload signal received",
|
||||
"signal", sig)
|
||||
// Trigger manual reload
|
||||
go sh.reloadManager.triggerReload(ctx)
|
||||
// Continue handling signals
|
||||
default:
|
||||
// Return termination signals
|
||||
return sig
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop cleans up signal handling
|
||||
func (sh *SignalHandler) Stop() {
|
||||
signal.Stop(sh.sigChan)
|
||||
close(sh.sigChan)
|
||||
}
|
||||
@ -2,6 +2,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@ -10,14 +11,16 @@ import (
|
||||
)
|
||||
|
||||
// statusReporter periodically logs service status
|
||||
func statusReporter(service *service.Service) {
|
||||
func statusReporter(service *service.Service, ctx context.Context) {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Clean shutdown
|
||||
return
|
||||
case <-ticker.C:
|
||||
// ⚠️ FIXED: Add nil check and safe access for service stats
|
||||
if service == nil {
|
||||
logger.Warn("msg", "Status reporter: service is nil",
|
||||
"component", "status_reporter")
|
||||
|
||||
@ -10,6 +10,7 @@ type Config struct {
|
||||
|
||||
// Runtime behavior flags
|
||||
DisableStatusReporter bool `toml:"disable_status_reporter"`
|
||||
ConfigAutoReload bool `toml:"config_auto_reload"`
|
||||
|
||||
// Internal flag indicating demonized child process
|
||||
BackgroundDaemon bool `toml:"background-daemon"`
|
||||
|
||||
@ -26,6 +26,7 @@ func defaults() *Config {
|
||||
|
||||
// Runtime behavior defaults
|
||||
DisableStatusReporter: false,
|
||||
ConfigAutoReload: false,
|
||||
|
||||
// Child process indicator
|
||||
BackgroundDaemon: false,
|
||||
|
||||
Reference in New Issue
Block a user