v0.2.0 restructured to pipeline architecture, dirty

This commit is contained in:
2025-07-11 04:52:41 -04:00
parent 5936f82970
commit b503816de3
51 changed files with 4132 additions and 5936 deletions

View File

@ -9,254 +9,285 @@ import (
"logwisp/src/internal/config"
"logwisp/src/internal/filter"
"logwisp/src/internal/monitor"
"logwisp/src/internal/transport"
"logwisp/src/internal/sink"
"logwisp/src/internal/source"
"github.com/lixenwraith/log"
)
// Service manages multiple pipelines
type Service struct {
streams map[string]*LogStream
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
logger *log.Logger
pipelines map[string]*Pipeline
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
logger *log.Logger
}
// New creates a new service
func New(ctx context.Context, logger *log.Logger) *Service {
serviceCtx, cancel := context.WithCancel(ctx)
return &Service{
streams: make(map[string]*LogStream),
ctx: serviceCtx,
cancel: cancel,
logger: logger,
pipelines: make(map[string]*Pipeline),
ctx: serviceCtx,
cancel: cancel,
logger: logger,
}
}
func (s *Service) CreateStream(cfg config.StreamConfig) error {
// NewPipeline creates and starts a new pipeline
func (s *Service) NewPipeline(cfg config.PipelineConfig) error {
s.mu.Lock()
defer s.mu.Unlock()
if _, exists := s.streams[cfg.Name]; exists {
err := fmt.Errorf("transport '%s' already exists", cfg.Name)
s.logger.Error("msg", "Failed to create stream - duplicate name",
if _, exists := s.pipelines[cfg.Name]; exists {
err := fmt.Errorf("pipeline '%s' already exists", cfg.Name)
s.logger.Error("msg", "Failed to create pipeline - duplicate name",
"component", "service",
"stream", cfg.Name,
"pipeline", cfg.Name,
"error", err)
return err
}
s.logger.Debug("msg", "Creating stream", "stream", cfg.Name)
s.logger.Debug("msg", "Creating pipeline", "pipeline", cfg.Name)
// Create transport context
streamCtx, streamCancel := context.WithCancel(s.ctx)
// Create pipeline context
pipelineCtx, pipelineCancel := context.WithCancel(s.ctx)
// Create monitor - pass the service logger directly
mon := monitor.New(s.logger)
mon.SetCheckInterval(time.Duration(cfg.GetCheckInterval(100)) * time.Millisecond)
// Add targets
for _, target := range cfg.GetTargets(nil) {
if err := mon.AddTarget(target.Path, target.Pattern, target.IsFile); err != nil {
streamCancel()
return fmt.Errorf("failed to add target %s: %w", target.Path, err)
}
// Create pipeline instance
pipeline := &Pipeline{
Name: cfg.Name,
Config: cfg,
Stats: &PipelineStats{
StartTime: time.Now(),
},
ctx: pipelineCtx,
cancel: pipelineCancel,
logger: s.logger,
}
// Start monitor
if err := mon.Start(streamCtx); err != nil {
streamCancel()
s.logger.Error("msg", "Failed to start monitor",
"component", "service",
"stream", cfg.Name,
"error", err)
return fmt.Errorf("failed to start monitor: %w", err)
// Create sources
for i, srcCfg := range cfg.Sources {
src, err := s.createSource(srcCfg)
if err != nil {
pipelineCancel()
return fmt.Errorf("failed to create source[%d]: %w", i, err)
}
pipeline.Sources = append(pipeline.Sources, src)
}
// Create filter chain
var filterChain *filter.Chain
if len(cfg.Filters) > 0 {
chain, err := filter.NewChain(cfg.Filters, s.logger)
if err != nil {
streamCancel()
s.logger.Error("msg", "Failed to create filter chain",
"component", "service",
"stream", cfg.Name,
"filter_count", len(cfg.Filters),
"error", err)
pipelineCancel()
return fmt.Errorf("failed to create filter chain: %w", err)
}
filterChain = chain
pipeline.FilterChain = chain
}
// Create log transport
ls := &LogStream{
Name: cfg.Name,
Config: cfg,
Monitor: mon,
FilterChain: filterChain,
Stats: &StreamStats{
StartTime: time.Now(),
},
ctx: streamCtx,
cancel: streamCancel,
logger: s.logger, // Use parent logger
}
// Create sinks
for i, sinkCfg := range cfg.Sinks {
sinkInst, err := s.createSink(sinkCfg)
if err != nil {
pipelineCancel()
return fmt.Errorf("failed to create sink[%d]: %w", i, err)
}
pipeline.Sinks = append(pipeline.Sinks, sinkInst)
// Start TCP server if configured
if cfg.TCPServer != nil && cfg.TCPServer.Enabled {
// Create filtered channel
rawChan := mon.Subscribe()
tcpChan := make(chan monitor.LogEntry, cfg.TCPServer.BufferSize)
// Start filter goroutine for TCP
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer close(tcpChan)
s.filterLoop(streamCtx, rawChan, tcpChan, filterChain)
}()
ls.TCPServer = transport.NewTCPStreamer(
tcpChan,
*cfg.TCPServer,
s.logger) // Pass parent logger
if err := s.startTCPServer(ls); err != nil {
ls.Shutdown()
s.logger.Error("msg", "Failed to start TCP server",
"component", "service",
"stream", cfg.Name,
"port", cfg.TCPServer.Port,
"error", err)
return fmt.Errorf("TCP server failed: %w", err)
// Track HTTP/TCP sinks for router mode
switch s := sinkInst.(type) {
case *sink.HTTPSink:
pipeline.HTTPSinks = append(pipeline.HTTPSinks, s)
case *sink.TCPSink:
pipeline.TCPSinks = append(pipeline.TCPSinks, s)
}
}
// Start HTTP server if configured
if cfg.HTTPServer != nil && cfg.HTTPServer.Enabled {
// Create filtered channel
rawChan := mon.Subscribe()
httpChan := make(chan monitor.LogEntry, cfg.HTTPServer.BufferSize)
// Start filter goroutine for HTTP
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer close(httpChan)
s.filterLoop(streamCtx, rawChan, httpChan, filterChain)
}()
ls.HTTPServer = transport.NewHTTPStreamer(
httpChan,
*cfg.HTTPServer,
s.logger) // Pass parent logger
if err := s.startHTTPServer(ls); err != nil {
ls.Shutdown()
s.logger.Error("msg", "Failed to start HTTP server",
"component", "service",
"stream", cfg.Name,
"port", cfg.HTTPServer.Port,
"error", err)
return fmt.Errorf("HTTP server failed: %w", err)
// Start all sources
for i, src := range pipeline.Sources {
if err := src.Start(); err != nil {
pipeline.Shutdown()
return fmt.Errorf("failed to start source[%d]: %w", i, err)
}
}
ls.startStatsUpdater(streamCtx)
// Start all sinks
for i, sinkInst := range pipeline.Sinks {
if err := sinkInst.Start(pipelineCtx); err != nil {
pipeline.Shutdown()
return fmt.Errorf("failed to start sink[%d]: %w", i, err)
}
}
s.streams[cfg.Name] = ls
s.logger.Info("msg", "Stream created successfully", "stream", cfg.Name)
// Wire sources to sinks through filters
s.wirePipeline(pipeline)
// Start stats updater
pipeline.startStatsUpdater(pipelineCtx)
s.pipelines[cfg.Name] = pipeline
s.logger.Info("msg", "Pipeline created successfully", "pipeline", cfg.Name)
return nil
}
// filterLoop applies filters to log entries
func (s *Service) filterLoop(ctx context.Context, in <-chan monitor.LogEntry, out chan<- monitor.LogEntry, chain *filter.Chain) {
for {
select {
case <-ctx.Done():
return
case entry, ok := <-in:
if !ok {
return
}
// wirePipeline connects sources to sinks through filters
func (s *Service) wirePipeline(p *Pipeline) {
// For each source, subscribe and process entries
for _, src := range p.Sources {
srcChan := src.Subscribe()
// Apply filter chain if configured
if chain == nil || chain.Apply(entry) {
// Create a processing goroutine for this source
p.wg.Add(1)
go func(source source.Source, entries <-chan source.LogEntry) {
defer p.wg.Done()
for {
select {
case out <- entry:
case <-ctx.Done():
case <-p.ctx.Done():
return
default:
// Drop if output buffer is full
s.logger.Debug("msg", "Dropped log entry - buffer full")
case entry, ok := <-entries:
if !ok {
return
}
p.Stats.TotalEntriesProcessed.Add(1)
// Apply filters if configured
if p.FilterChain != nil {
if !p.FilterChain.Apply(entry) {
p.Stats.TotalEntriesFiltered.Add(1)
continue
}
}
// Send to all sinks
for _, sinkInst := range p.Sinks {
select {
case sinkInst.Input() <- entry:
case <-p.ctx.Done():
return
default:
// Drop if sink buffer is full
s.logger.Debug("msg", "Dropped log entry - sink buffer full",
"pipeline", p.Name)
}
}
}
}
}
}(src, srcChan)
}
}
func (s *Service) GetStream(name string) (*LogStream, error) {
// createSource creates a source instance based on configuration
func (s *Service) createSource(cfg config.SourceConfig) (source.Source, error) {
switch cfg.Type {
case "directory":
return source.NewDirectorySource(cfg.Options, s.logger)
case "stdin":
return source.NewStdinSource(cfg.Options, s.logger)
default:
return nil, fmt.Errorf("unknown source type: %s", cfg.Type)
}
}
// createSink creates a sink instance based on configuration
func (s *Service) createSink(cfg config.SinkConfig) (sink.Sink, error) {
switch cfg.Type {
case "http":
return sink.NewHTTPSink(cfg.Options, s.logger)
case "tcp":
return sink.NewTCPSink(cfg.Options, s.logger)
case "file":
return sink.NewFileSink(cfg.Options, s.logger)
case "stdout":
return sink.NewStdoutSink(cfg.Options, s.logger)
case "stderr":
return sink.NewStderrSink(cfg.Options, s.logger)
default:
return nil, fmt.Errorf("unknown sink type: %s", cfg.Type)
}
}
// GetPipeline returns a pipeline by name
func (s *Service) GetPipeline(name string) (*Pipeline, error) {
s.mu.RLock()
defer s.mu.RUnlock()
stream, exists := s.streams[name]
pipeline, exists := s.pipelines[name]
if !exists {
return nil, fmt.Errorf("transport '%s' not found", name)
return nil, fmt.Errorf("pipeline '%s' not found", name)
}
return stream, nil
return pipeline, nil
}
// ListStreams is deprecated, use ListPipelines
func (s *Service) ListStreams() []string {
s.logger.Warn("msg", "ListStreams is deprecated, use ListPipelines",
"component", "service")
return s.ListPipelines()
}
// ListPipelines returns all pipeline names
func (s *Service) ListPipelines() []string {
s.mu.RLock()
defer s.mu.RUnlock()
names := make([]string, 0, len(s.streams))
for name := range s.streams {
names := make([]string, 0, len(s.pipelines))
for name := range s.pipelines {
names = append(names, name)
}
return names
}
// RemoveStream is deprecated, use RemovePipeline
func (s *Service) RemoveStream(name string) error {
s.logger.Warn("msg", "RemoveStream is deprecated, use RemovePipeline",
"component", "service")
return s.RemovePipeline(name)
}
// RemovePipeline stops and removes a pipeline
func (s *Service) RemovePipeline(name string) error {
s.mu.Lock()
defer s.mu.Unlock()
stream, exists := s.streams[name]
pipeline, exists := s.pipelines[name]
if !exists {
err := fmt.Errorf("transport '%s' not found", name)
s.logger.Warn("msg", "Cannot remove non-existent stream",
err := fmt.Errorf("pipeline '%s' not found", name)
s.logger.Warn("msg", "Cannot remove non-existent pipeline",
"component", "service",
"stream", name,
"pipeline", name,
"error", err)
return err
}
s.logger.Info("msg", "Removing stream", "stream", name)
stream.Shutdown()
delete(s.streams, name)
s.logger.Info("msg", "Removing pipeline", "pipeline", name)
pipeline.Shutdown()
delete(s.pipelines, name)
return nil
}
// Shutdown stops all pipelines
func (s *Service) Shutdown() {
s.logger.Info("msg", "Service shutdown initiated")
s.mu.Lock()
streams := make([]*LogStream, 0, len(s.streams))
for _, stream := range s.streams {
streams = append(streams, stream)
pipelines := make([]*Pipeline, 0, len(s.pipelines))
for _, pipeline := range s.pipelines {
pipelines = append(pipelines, pipeline)
}
s.mu.Unlock()
// Stop all streams concurrently
// Stop all pipelines concurrently
var wg sync.WaitGroup
for _, stream := range streams {
for _, pipeline := range pipelines {
wg.Add(1)
go func(ls *LogStream) {
go func(p *Pipeline) {
defer wg.Done()
ls.Shutdown()
}(stream)
p.Shutdown()
}(pipeline)
}
wg.Wait()
@ -266,68 +297,19 @@ func (s *Service) Shutdown() {
s.logger.Info("msg", "Service shutdown complete")
}
// GetGlobalStats returns statistics for all pipelines
func (s *Service) GetGlobalStats() map[string]any {
s.mu.RLock()
defer s.mu.RUnlock()
stats := map[string]any{
"streams": make(map[string]any),
"total_streams": len(s.streams),
"pipelines": make(map[string]any),
"total_pipelines": len(s.pipelines),
}
for name, stream := range s.streams {
stats["streams"].(map[string]any)[name] = stream.GetStats()
for name, pipeline := range s.pipelines {
stats["pipelines"].(map[string]any)[name] = pipeline.GetStats()
}
return stats
}
func (s *Service) startTCPServer(ls *LogStream) error {
errChan := make(chan error, 1)
s.wg.Add(1)
go func() {
defer s.wg.Done()
if err := ls.TCPServer.Start(); err != nil {
errChan <- err
}
}()
// Check startup
select {
case err := <-errChan:
s.logger.Error("msg", "TCP server startup failed immediately",
"component", "service",
"stream", ls.Name,
"error", err)
return err
case <-time.After(time.Second):
s.logger.Debug("msg", "TCP server started", "stream", ls.Name)
return nil
}
}
func (s *Service) startHTTPServer(ls *LogStream) error {
errChan := make(chan error, 1)
s.wg.Add(1)
go func() {
defer s.wg.Done()
if err := ls.HTTPServer.Start(); err != nil {
errChan <- err
}
}()
// Check startup
select {
case err := <-errChan:
s.logger.Error("msg", "HTTP server startup failed immediately",
"component", "service",
"stream", ls.Name,
"error", err)
return err
case <-time.After(time.Second):
s.logger.Debug("msg", "HTTP server started", "stream", ls.Name)
return nil
}
}