v0.2.2 minor fixes
This commit is contained in:
@ -87,11 +87,6 @@ func validateLogConfig(cfg *LogConfig) error {
|
|||||||
return fmt.Errorf("invalid console target: %s", cfg.Console.Target)
|
return fmt.Errorf("invalid console target: %s", cfg.Console.Target)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: check if file output check is correct
|
|
||||||
if cfg.Console.Target == "split" && cfg.Output == "file" {
|
|
||||||
return fmt.Errorf("console target 'split' requires output mode 'stdout', 'stderr', or 'both'")
|
|
||||||
}
|
|
||||||
|
|
||||||
validFormats := map[string]bool{
|
validFormats := map[string]bool{
|
||||||
"txt": true, "json": true, "": true,
|
"txt": true, "json": true, "": true,
|
||||||
}
|
}
|
||||||
|
|||||||
@ -85,7 +85,7 @@ func (p *Pipeline) Shutdown() {
|
|||||||
// GetStats returns pipeline statistics
|
// GetStats returns pipeline statistics
|
||||||
func (p *Pipeline) GetStats() map[string]any {
|
func (p *Pipeline) GetStats() map[string]any {
|
||||||
// Recovery to handle concurrent access during shutdown
|
// Recovery to handle concurrent access during shutdown
|
||||||
// TODO: check if needed to keep
|
// When service is shutting down, sources/sinks might be nil or partially stopped
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
p.logger.Error("msg", "Panic getting pipeline stats",
|
p.logger.Error("msg", "Panic getting pipeline stats",
|
||||||
|
|||||||
@ -144,13 +144,23 @@ func (s *Service) wirePipeline(p *Pipeline) {
|
|||||||
defer p.wg.Done()
|
defer p.wg.Done()
|
||||||
|
|
||||||
// Panic recovery to prevent single source from crashing pipeline
|
// Panic recovery to prevent single source from crashing pipeline
|
||||||
// TODO: check if failed pipeline is properly shut down
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
s.logger.Error("msg", "Panic in pipeline processing",
|
s.logger.Error("msg", "Panic in pipeline processing",
|
||||||
"pipeline", p.Name,
|
"pipeline", p.Name,
|
||||||
"source", source.GetStats().Type,
|
"source", source.GetStats().Type,
|
||||||
"panic", r)
|
"panic", r)
|
||||||
|
|
||||||
|
// Ensure failed pipelines don't leave resources hanging
|
||||||
|
go func() {
|
||||||
|
s.logger.Warn("msg", "Shutting down pipeline due to panic",
|
||||||
|
"pipeline", p.Name)
|
||||||
|
if err := s.RemovePipeline(p.Name); err != nil {
|
||||||
|
s.logger.Error("msg", "Failed to remove panicked pipeline",
|
||||||
|
"pipeline", p.Name,
|
||||||
|
"error", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user