From e88812bb096fd30ff3e83c6a960b5184ef954689c7868fc8f4a2533a31cf2adc Mon Sep 17 00:00:00 2001 From: Lixen Wraith Date: Tue, 15 Jul 2025 01:24:41 -0400 Subject: [PATCH] v0.3.5 centralized formattig, refactored --- .gitignore | 2 + go.mod | 3 +- go.sum | 8 +- src/internal/config/pipeline.go | 4 + src/internal/format/format.go | 38 +++++++ src/internal/format/json.go | 157 +++++++++++++++++++++++++++ src/internal/format/raw.go | 31 ++++++ src/internal/format/text.go | 108 ++++++++++++++++++ src/internal/service/httprouter.go | 6 - src/internal/service/routerserver.go | 2 +- src/internal/service/service.go | 47 ++++++-- src/internal/sink/console.go | 34 +++--- src/internal/sink/file.go | 26 +++-- src/internal/sink/http.go | 91 +++++++++------- src/internal/sink/http_client.go | 39 ++++++- src/internal/sink/tcp.go | 67 +++++++----- src/internal/sink/tcp_client.go | 25 +++-- src/internal/source/directory.go | 3 +- src/internal/source/file_watcher.go | 2 +- 19 files changed, 560 insertions(+), 133 deletions(-) create mode 100644 src/internal/format/format.go create mode 100644 src/internal/format/json.go create mode 100644 src/internal/format/raw.go create mode 100644 src/internal/format/text.go diff --git a/.gitignore b/.gitignore index 1d5f65f..64d71d5 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ cert bin script build +*.log +*.toml \ No newline at end of file diff --git a/go.mod b/go.mod index 9a589bc..91e2029 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.24.5 require ( github.com/lixenwraith/config v0.0.0-20250712170030-7d38402e0497 - github.com/lixenwraith/log v0.0.0-20250713052337-7b17ce48112f + github.com/lixenwraith/log v0.0.0-20250715004922-6d83a0eac2ac github.com/panjf2000/gnet/v2 v2.9.1 github.com/valyala/fasthttp v1.63.0 ) @@ -12,6 +12,7 @@ require ( require ( github.com/BurntSushi/toml v1.5.0 // indirect github.com/andybalholm/brotli v1.2.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/panjf2000/ants/v2 v2.11.3 // indirect diff --git a/go.sum b/go.sum index 49d5fa9..8632220 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,12 @@ github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zt github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/lixenwraith/config v0.0.0-20250712170030-7d38402e0497 h1:ixTIdJSd945n/IhMRwGwQVmQnQ1nUr5z1wn31jXq9FU= github.com/lixenwraith/config v0.0.0-20250712170030-7d38402e0497/go.mod h1:y7kgDrWIFROWJJ6ASM/SPTRRAj27FjRGWh2SDLcdQ68= -github.com/lixenwraith/log v0.0.0-20250713052337-7b17ce48112f h1:GBSik9B/Eaqz1ogSbG6C+Ti199k8cd7atnNSkCdkfV4= -github.com/lixenwraith/log v0.0.0-20250713052337-7b17ce48112f/go.mod h1:lEjTIMWGW+XGn20x5Ec0qkNK4LCd7Y/04PII51crYKk= +github.com/lixenwraith/log v0.0.0-20250713210809-0ac292ae5dc1 h1:kcZRASUvPdqnvgMxqxx/FZCWzCwz4bA7ArT8L3djZtk= +github.com/lixenwraith/log v0.0.0-20250713210809-0ac292ae5dc1/go.mod h1:BLWEFFryXtvvdUQkD+atik4uTyukO7gRubWpSNdW210= +github.com/lixenwraith/log v0.0.0-20250714221910-15e54d455464 h1:94riru1LpECWoIca4mnVW/9O1a9jUOB2HaeMSbKmDJQ= +github.com/lixenwraith/log v0.0.0-20250714221910-15e54d455464/go.mod h1:egVvySkgFmQXAlekEpeBqGVmopd09tP6BZB58JQJEfM= +github.com/lixenwraith/log v0.0.0-20250715004922-6d83a0eac2ac h1:PfbHbKeCHQnzRlSOLhzd5OJofx2EJKzZX7yc0/xuw3w= +github.com/lixenwraith/log v0.0.0-20250715004922-6d83a0eac2ac/go.mod h1:egVvySkgFmQXAlekEpeBqGVmopd09tP6BZB58JQJEfM= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/panjf2000/ants/v2 v2.11.3 h1:AfI0ngBoXJmYOpDh9m516vjqoUu2sLrIVgppI9TZVpg= diff --git a/src/internal/config/pipeline.go b/src/internal/config/pipeline.go index 609e38a..efb5abc 100644 --- a/src/internal/config/pipeline.go +++ b/src/internal/config/pipeline.go @@ -23,6 +23,10 @@ type PipelineConfig struct { // Filter configuration Filters []FilterConfig `toml:"filters"` + // Log formatting configuration + Format string `toml:"format"` + FormatOptions map[string]any `toml:"format_options"` + // Output sinks for this pipeline Sinks []SinkConfig `toml:"sinks"` diff --git a/src/internal/format/format.go b/src/internal/format/format.go new file mode 100644 index 0000000..5848bd5 --- /dev/null +++ b/src/internal/format/format.go @@ -0,0 +1,38 @@ +// FILE: src/internal/format/format.go +package format + +import ( + "fmt" + + "logwisp/src/internal/source" + + "github.com/lixenwraith/log" +) + +// Formatter defines the interface for transforming a LogEntry into a byte slice. +type Formatter interface { + // Format takes a LogEntry and returns the formatted log as a byte slice. + Format(entry source.LogEntry) ([]byte, error) + + // Name returns the formatter type name + Name() string +} + +// New creates a new Formatter based on the provided configuration. +func New(name string, options map[string]any, logger *log.Logger) (Formatter, error) { + // Default to raw if no format specified + if name == "" { + name = "raw" + } + + switch name { + case "json": + return NewJSONFormatter(options, logger) + case "text": + return NewTextFormatter(options, logger) + case "raw": + return NewRawFormatter(options, logger) + default: + return nil, fmt.Errorf("unknown formatter type: %s", name) + } +} \ No newline at end of file diff --git a/src/internal/format/json.go b/src/internal/format/json.go new file mode 100644 index 0000000..7c2f42b --- /dev/null +++ b/src/internal/format/json.go @@ -0,0 +1,157 @@ +// FILE: src/internal/format/json.go +package format + +import ( + "encoding/json" + "fmt" + "time" + + "logwisp/src/internal/source" + + "github.com/lixenwraith/log" +) + +// JSONFormatter produces structured JSON logs +type JSONFormatter struct { + pretty bool + timestampField string + levelField string + messageField string + sourceField string + logger *log.Logger +} + +// NewJSONFormatter creates a new JSON formatter +func NewJSONFormatter(options map[string]any, logger *log.Logger) (*JSONFormatter, error) { + f := &JSONFormatter{ + timestampField: "timestamp", + levelField: "level", + messageField: "message", + sourceField: "source", + logger: logger, + } + + // Extract options + if pretty, ok := options["pretty"].(bool); ok { + f.pretty = pretty + } + if field, ok := options["timestamp_field"].(string); ok && field != "" { + f.timestampField = field + } + if field, ok := options["level_field"].(string); ok && field != "" { + f.levelField = field + } + if field, ok := options["message_field"].(string); ok && field != "" { + f.messageField = field + } + if field, ok := options["source_field"].(string); ok && field != "" { + f.sourceField = field + } + + return f, nil +} + +// Format formats the log entry as JSON +func (f *JSONFormatter) Format(entry source.LogEntry) ([]byte, error) { + // Start with a clean map + output := make(map[string]any) + + // First, populate with LogWisp metadata + output[f.timestampField] = entry.Time.Format(time.RFC3339Nano) + output[f.levelField] = entry.Level + output[f.sourceField] = entry.Source + + // Try to parse the message as JSON + var msgData map[string]any + if err := json.Unmarshal([]byte(entry.Message), &msgData); err == nil { + // Message is valid JSON - merge fields + // LogWisp metadata takes precedence + for k, v := range msgData { + // Don't overwrite our standard fields + if k != f.timestampField && k != f.levelField && k != f.sourceField { + output[k] = v + } + } + + // If the original JSON had these fields, log that we're overriding + if _, hasTime := msgData[f.timestampField]; hasTime { + f.logger.Debug("msg", "Overriding timestamp from JSON message", + "component", "json_formatter", + "original", msgData[f.timestampField], + "logwisp", output[f.timestampField]) + } + } else { + // Message is not valid JSON - add as message field + output[f.messageField] = entry.Message + } + + // Add any additional fields from LogEntry.Fields + if len(entry.Fields) > 0 { + var fields map[string]any + if err := json.Unmarshal(entry.Fields, &fields); err == nil { + // Merge additional fields, but don't override existing + for k, v := range fields { + if _, exists := output[k]; !exists { + output[k] = v + } + } + } + } + + // Marshal to JSON + var result []byte + var err error + if f.pretty { + result, err = json.MarshalIndent(output, "", " ") + } else { + result, err = json.Marshal(output) + } + + if err != nil { + return nil, fmt.Errorf("failed to marshal JSON: %w", err) + } + + // Add newline + return append(result, '\n'), nil +} + +// Name returns the formatter name +func (f *JSONFormatter) Name() string { + return "json" +} + +// FormatBatch formats multiple entries as a JSON array +// This is a special method for sinks that need to batch entries +func (f *JSONFormatter) FormatBatch(entries []source.LogEntry) ([]byte, error) { + // For batching, we need to create an array of formatted objects + batch := make([]json.RawMessage, 0, len(entries)) + + for _, entry := range entries { + // Format each entry without the trailing newline + formatted, err := f.Format(entry) + if err != nil { + f.logger.Warn("msg", "Failed to format entry in batch", + "component", "json_formatter", + "error", err) + continue + } + + // Remove the trailing newline for array elements + if len(formatted) > 0 && formatted[len(formatted)-1] == '\n' { + formatted = formatted[:len(formatted)-1] + } + + batch = append(batch, formatted) + } + + // Marshal the entire batch as an array + var result []byte + var err error + if f.pretty { + result, err = json.MarshalIndent(batch, "", " ") + } else { + result, err = json.Marshal(batch) + } + + return result, err +} \ No newline at end of file diff --git a/src/internal/format/raw.go b/src/internal/format/raw.go new file mode 100644 index 0000000..54be4da --- /dev/null +++ b/src/internal/format/raw.go @@ -0,0 +1,31 @@ +// FILE: src/internal/format/raw.go +package format + +import ( + "logwisp/src/internal/source" + + "github.com/lixenwraith/log" +) + +// RawFormatter outputs the log message as-is with a newline +type RawFormatter struct { + logger *log.Logger +} + +// NewRawFormatter creates a new raw formatter +func NewRawFormatter(options map[string]any, logger *log.Logger) (*RawFormatter, error) { + return &RawFormatter{ + logger: logger, + }, nil +} + +// Format returns the message with a newline appended +func (f *RawFormatter) Format(entry source.LogEntry) ([]byte, error) { + // Simply return the message with newline + return append([]byte(entry.Message), '\n'), nil +} + +// Name returns the formatter name +func (f *RawFormatter) Name() string { + return "raw" +} \ No newline at end of file diff --git a/src/internal/format/text.go b/src/internal/format/text.go new file mode 100644 index 0000000..8cd9b60 --- /dev/null +++ b/src/internal/format/text.go @@ -0,0 +1,108 @@ +// FILE: src/internal/format/text.go +package format + +import ( + "bytes" + "fmt" + "strings" + "text/template" + "time" + + "logwisp/src/internal/source" + + "github.com/lixenwraith/log" +) + +// TextFormatter produces human-readable text logs using templates +type TextFormatter struct { + template *template.Template + timestampFormat string + logger *log.Logger +} + +// NewTextFormatter creates a new text formatter +func NewTextFormatter(options map[string]any, logger *log.Logger) (*TextFormatter, error) { + // Default template + templateStr := "[{{.Timestamp | FmtTime}}] [{{.Level | ToUpper}}] {{.Source}} - {{.Message}}{{ if .Fields }} {{.Fields}}{{ end }}" + if tmpl, ok := options["template"].(string); ok && tmpl != "" { + templateStr = tmpl + } + + // Default timestamp format + timestampFormat := time.RFC3339 + if tsFormat, ok := options["timestamp_format"].(string); ok && tsFormat != "" { + timestampFormat = tsFormat + } + + f := &TextFormatter{ + timestampFormat: timestampFormat, + logger: logger, + } + + // Create template with helper functions + funcMap := template.FuncMap{ + "FmtTime": func(t time.Time) string { + return t.Format(f.timestampFormat) + }, + "ToUpper": strings.ToUpper, + "ToLower": strings.ToLower, + "TrimSpace": strings.TrimSpace, + } + + tmpl, err := template.New("log").Funcs(funcMap).Parse(templateStr) + if err != nil { + return nil, fmt.Errorf("invalid template: %w", err) + } + + f.template = tmpl + return f, nil +} + +// Format formats the log entry using the template +func (f *TextFormatter) Format(entry source.LogEntry) ([]byte, error) { + // Prepare data for template + data := map[string]any{ + "Timestamp": entry.Time, + "Level": entry.Level, + "Source": entry.Source, + "Message": entry.Message, + } + + // Set default level if empty + if data["Level"] == "" { + data["Level"] = "INFO" + } + + // Add fields if present + if len(entry.Fields) > 0 { + data["Fields"] = string(entry.Fields) + } + + var buf bytes.Buffer + if err := f.template.Execute(&buf, data); err != nil { + // Fallback: return a basic formatted message + f.logger.Debug("msg", "Template execution failed, using fallback", + "component", "text_formatter", + "error", err) + + fallback := fmt.Sprintf("[%s] [%s] %s - %s\n", + entry.Time.Format(f.timestampFormat), + strings.ToUpper(entry.Level), + entry.Source, + entry.Message) + return []byte(fallback), nil + } + + // Ensure newline at end + result := buf.Bytes() + if len(result) == 0 || result[len(result)-1] != '\n' { + result = append(result, '\n') + } + + return result, nil +} + +// Name returns the formatter name +func (f *TextFormatter) Name() string { + return "text" +} \ No newline at end of file diff --git a/src/internal/service/httprouter.go b/src/internal/service/httprouter.go index 1b35066..d44202b 100644 --- a/src/internal/service/httprouter.go +++ b/src/internal/service/httprouter.go @@ -141,12 +141,6 @@ func (r *HTTPRouter) registerHTTPSink(pipelineName string, httpSink *sink.HTTPSi return nil } -// UnregisterStream is deprecated -func (r *HTTPRouter) UnregisterStream(streamName string) { - r.logger.Warn("msg", "UnregisterStream is deprecated", - "component", "http_router") -} - // UnregisterPipeline removes a pipeline's routes func (r *HTTPRouter) UnregisterPipeline(pipelineName string) { r.mu.RLock() diff --git a/src/internal/service/routerserver.go b/src/internal/service/routerserver.go index 9e6e273..129194f 100644 --- a/src/internal/service/routerserver.go +++ b/src/internal/service/routerserver.go @@ -62,7 +62,7 @@ func (rs *routerServer) requestHandler(ctx *fasthttp.RequestCtx) { for prefix, route := range rs.routes { if strings.HasPrefix(path, prefix) { - // Use longest prefix match + // Longest prefix match if len(prefix) > len(matchedPrefix) { matchedPrefix = prefix matchedSink = route diff --git a/src/internal/service/service.go b/src/internal/service/service.go index 24f55e9..394998f 100644 --- a/src/internal/service/service.go +++ b/src/internal/service/service.go @@ -4,12 +4,13 @@ package service import ( "context" "fmt" - "logwisp/src/internal/ratelimit" "sync" "time" "logwisp/src/internal/config" "logwisp/src/internal/filter" + "logwisp/src/internal/format" + "logwisp/src/internal/ratelimit" "logwisp/src/internal/sink" "logwisp/src/internal/source" @@ -98,9 +99,20 @@ func (s *Service) NewPipeline(cfg config.PipelineConfig) error { pipeline.FilterChain = chain } + // Create formatter for the pipeline + var formatter format.Formatter + var err error + if cfg.Format != "" || len(cfg.FormatOptions) > 0 { + formatter, err = format.New(cfg.Format, cfg.FormatOptions, s.logger) + if err != nil { + pipelineCancel() + return fmt.Errorf("failed to create formatter: %w", err) + } + } + // Create sinks for i, sinkCfg := range cfg.Sinks { - sinkInst, err := s.createSink(sinkCfg) + sinkInst, err := s.createSink(sinkCfg, formatter) // Pass formatter if err != nil { pipelineCancel() return fmt.Errorf("failed to create sink[%d]: %w", i, err) @@ -237,22 +249,37 @@ func (s *Service) createSource(cfg config.SourceConfig) (source.Source, error) { } // createSink creates a sink instance based on configuration -func (s *Service) createSink(cfg config.SinkConfig) (sink.Sink, error) { +func (s *Service) createSink(cfg config.SinkConfig, formatter format.Formatter) (sink.Sink, error) { + if formatter == nil { + // Default formatters for different sink types + defaultFormat := "raw" + switch cfg.Type { + case "http", "tcp", "http_client", "tcp_client": + defaultFormat = "json" + } + + var err error + formatter, err = format.New(defaultFormat, nil, s.logger) + if err != nil { + return nil, fmt.Errorf("failed to create default formatter: %w", err) + } + } + switch cfg.Type { case "http": - return sink.NewHTTPSink(cfg.Options, s.logger) + return sink.NewHTTPSink(cfg.Options, s.logger, formatter) // needs implementation case "tcp": - return sink.NewTCPSink(cfg.Options, s.logger) + return sink.NewTCPSink(cfg.Options, s.logger, formatter) // needs implementation case "http_client": - return sink.NewHTTPClientSink(cfg.Options, s.logger) + return sink.NewHTTPClientSink(cfg.Options, s.logger, formatter) // needs verification case "tcp_client": - return sink.NewTCPClientSink(cfg.Options, s.logger) + return sink.NewTCPClientSink(cfg.Options, s.logger, formatter) // needs implementation case "file": - return sink.NewFileSink(cfg.Options, s.logger) + return sink.NewFileSink(cfg.Options, s.logger, formatter) case "stdout": - return sink.NewStdoutSink(cfg.Options, s.logger) + return sink.NewStdoutSink(cfg.Options, s.logger, formatter) // needs implementation case "stderr": - return sink.NewStderrSink(cfg.Options, s.logger) + return sink.NewStderrSink(cfg.Options, s.logger, formatter) // needs implementation default: return nil, fmt.Errorf("unknown sink type: %s", cfg.Type) } diff --git a/src/internal/sink/console.go b/src/internal/sink/console.go index ec27c7a..1472358 100644 --- a/src/internal/sink/console.go +++ b/src/internal/sink/console.go @@ -3,13 +3,13 @@ package sink import ( "context" - "fmt" "io" "os" "strings" "sync/atomic" "time" + "logwisp/src/internal/format" "logwisp/src/internal/source" "github.com/lixenwraith/log" @@ -29,6 +29,7 @@ type StdoutSink struct { done chan struct{} startTime time.Time logger *log.Logger + formatter format.Formatter // Statistics totalProcessed atomic.Uint64 @@ -36,7 +37,7 @@ type StdoutSink struct { } // NewStdoutSink creates a new stdout sink -func NewStdoutSink(options map[string]any, logger *log.Logger) (*StdoutSink, error) { +func NewStdoutSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*StdoutSink, error) { config := ConsoleConfig{ Target: "stdout", BufferSize: 1000, @@ -58,6 +59,7 @@ func NewStdoutSink(options map[string]any, logger *log.Logger) (*StdoutSink, err done: make(chan struct{}), startTime: time.Now(), logger: logger, + formatter: formatter, } s.lastProcessed.Store(time.Time{}) @@ -117,14 +119,12 @@ func (s *StdoutSink) processLoop(ctx context.Context) { } // Format and write - timestamp := entry.Time.Format(time.RFC3339Nano) - level := entry.Level - if level == "" { - level = "INFO" + formatted, err := s.formatter.Format(entry) + if err != nil { + s.logger.Error("msg", "Failed to format log entry for stdout", "error", err) + continue } - - // Direct write to stdout - fmt.Fprintf(s.output, "[%s] %s %s\n", timestamp, level, entry.Message) + s.output.Write(formatted) case <-ctx.Done(): return @@ -142,6 +142,7 @@ type StderrSink struct { done chan struct{} startTime time.Time logger *log.Logger + formatter format.Formatter // Statistics totalProcessed atomic.Uint64 @@ -149,7 +150,7 @@ type StderrSink struct { } // NewStderrSink creates a new stderr sink -func NewStderrSink(options map[string]any, logger *log.Logger) (*StderrSink, error) { +func NewStderrSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*StderrSink, error) { config := ConsoleConfig{ Target: "stderr", BufferSize: 1000, @@ -171,6 +172,7 @@ func NewStderrSink(options map[string]any, logger *log.Logger) (*StderrSink, err done: make(chan struct{}), startTime: time.Now(), logger: logger, + formatter: formatter, } s.lastProcessed.Store(time.Time{}) @@ -230,14 +232,12 @@ func (s *StderrSink) processLoop(ctx context.Context) { } // Format and write - timestamp := entry.Time.Format(time.RFC3339Nano) - level := entry.Level - if level == "" { - level = "INFO" + formatted, err := s.formatter.Format(entry) + if err != nil { + s.logger.Error("msg", "Failed to format log entry for stderr", "error", err) + continue } - - // Direct write to stderr - fmt.Fprintf(s.output, "[%s] %s %s\n", timestamp, level, entry.Message) + s.output.Write(formatted) case <-ctx.Done(): return diff --git a/src/internal/sink/file.go b/src/internal/sink/file.go index a8d5ee2..7ec7a2b 100644 --- a/src/internal/sink/file.go +++ b/src/internal/sink/file.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "time" + "logwisp/src/internal/format" "logwisp/src/internal/source" "github.com/lixenwraith/log" @@ -19,6 +20,7 @@ type FileSink struct { done chan struct{} startTime time.Time logger *log.Logger // Application logger + formatter format.Formatter // Statistics totalProcessed atomic.Uint64 @@ -26,7 +28,7 @@ type FileSink struct { } // NewFileSink creates a new file sink -func NewFileSink(options map[string]any, logger *log.Logger) (*FileSink, error) { +func NewFileSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*FileSink, error) { directory, ok := options["directory"].(string) if !ok || directory == "" { return nil, fmt.Errorf("file sink requires 'directory' option") @@ -82,6 +84,7 @@ func NewFileSink(options map[string]any, logger *log.Logger) (*FileSink, error) done: make(chan struct{}), startTime: time.Now(), logger: logger, + formatter: formatter, } fs.lastProcessed.Store(time.Time{}) @@ -135,16 +138,21 @@ func (fs *FileSink) processLoop(ctx context.Context) { fs.totalProcessed.Add(1) fs.lastProcessed.Store(time.Now()) - // Format the log entry - // Include timestamp and level since we disabled them in the writer - timestamp := entry.Time.Format(time.RFC3339Nano) - level := entry.Level - if level == "" { - level = "INFO" + // Format using the formatter instead of fmt.Sprintf + formatted, err := fs.formatter.Format(entry) + if err != nil { + fs.logger.Error("msg", "Failed to format log entry", + "component", "file_sink", + "error", err) + continue } - // Write to file using the internal logger - fs.writer.Message(fmt.Sprintf("[%s] %s %s", timestamp, level, entry.Message)) + // Write formatted bytes (strip newline as writer adds it) + message := string(formatted) + if len(message) > 0 && message[len(message)-1] == '\n' { + message = message[:len(message)-1] + } + fs.writer.Message(message) case <-ctx.Done(): return diff --git a/src/internal/sink/http.go b/src/internal/sink/http.go index 62a03d3..eb15316 100644 --- a/src/internal/sink/http.go +++ b/src/internal/sink/http.go @@ -3,15 +3,16 @@ package sink import ( "bufio" + "bytes" "context" "encoding/json" "fmt" - "strings" "sync" "sync/atomic" "time" "logwisp/src/internal/config" + "logwisp/src/internal/format" "logwisp/src/internal/netlimit" "logwisp/src/internal/source" "logwisp/src/internal/version" @@ -32,6 +33,7 @@ type HTTPSink struct { done chan struct{} wg sync.WaitGroup logger *log.Logger + formatter format.Formatter // Path configuration streamPath string @@ -60,7 +62,7 @@ type HTTPConfig struct { } // NewHTTPSink creates a new HTTP streaming sink -func NewHTTPSink(options map[string]any, logger *log.Logger) (*HTTPSink, error) { +func NewHTTPSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*HTTPSink, error) { cfg := HTTPConfig{ Port: 8080, BufferSize: 1000, @@ -91,8 +93,8 @@ func NewHTTPSink(options map[string]any, logger *log.Logger) (*HTTPSink, error) } cfg.Heartbeat.IncludeTimestamp, _ = hb["include_timestamp"].(bool) cfg.Heartbeat.IncludeStats, _ = hb["include_stats"].(bool) - if format, ok := hb["format"].(string); ok { - cfg.Heartbeat.Format = format + if hbFormat, ok := hb["format"].(string); ok { + cfg.Heartbeat.Format = hbFormat } } @@ -132,6 +134,7 @@ func NewHTTPSink(options map[string]any, logger *log.Logger) (*HTTPSink, error) statusPath: cfg.StatusPath, standalone: true, logger: logger, + formatter: formatter, } h.lastProcessed.Store(time.Time{}) @@ -148,6 +151,7 @@ func (h *HTTPSink) Input() chan<- source.LogEntry { } func (h *HTTPSink) Start(ctx context.Context) error { + // TODO: use or remove unused ctx if !h.standalone { // In router mode, don't start our own server h.logger.Debug("msg", "HTTP sink in router mode, skipping server start", @@ -356,7 +360,7 @@ func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx) { "buffer_size": h.config.BufferSize, } data, _ := json.Marshal(connectionInfo) - fmt.Fprintf(w, "event: connected\ndata: %s\n\n", data) + fmt.Fprintf(w, "event: connected\ndata: %s\n", data) w.Flush() var ticker *time.Ticker @@ -375,27 +379,28 @@ func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx) { return } - data, err := json.Marshal(entry) - if err != nil { - h.logger.Error("msg", "Failed to marshal log entry", + if err := h.formatEntryForSSE(w, entry); err != nil { + h.logger.Error("msg", "Failed to format log entry", "component", "http_sink", "error", err, "entry_source", entry.Source) continue } - fmt.Fprintf(w, "data: %s\n\n", data) if err := w.Flush(); err != nil { - // Client disconnected, fasthttp handles cleanup + // Client disconnected return } case <-tickerChan: - if heartbeat := h.formatHeartbeat(); heartbeat != "" { - fmt.Fprint(w, heartbeat) - if err := w.Flush(); err != nil { - return - } + heartbeatEntry := h.createHeartbeatEntry() + if err := h.formatEntryForSSE(w, heartbeatEntry); err != nil { + h.logger.Error("msg", "Failed to format heartbeat", + "component", "http_sink", + "error", err) + } + if err := w.Flush(); err != nil { + return } case <-h.done: @@ -410,42 +415,46 @@ func (h *HTTPSink) handleStream(ctx *fasthttp.RequestCtx) { ctx.SetBodyStreamWriter(streamFunc) } -func (h *HTTPSink) formatHeartbeat() string { - if !h.config.Heartbeat.Enabled { - return "" +func (h *HTTPSink) formatEntryForSSE(w *bufio.Writer, entry source.LogEntry) error { + formatted, err := h.formatter.Format(entry) + if err != nil { + return err } - if h.config.Heartbeat.Format == "json" { - data := make(map[string]any) - data["type"] = "heartbeat" + // Remove trailing newline if present (SSE adds its own) + formatted = bytes.TrimSuffix(formatted, []byte{'\n'}) - if h.config.Heartbeat.IncludeTimestamp { - data["timestamp"] = time.Now().UTC().Format(time.RFC3339) - } - - if h.config.Heartbeat.IncludeStats { - data["active_clients"] = h.activeClients.Load() - data["uptime_seconds"] = int(time.Since(h.startTime).Seconds()) - } - - jsonData, _ := json.Marshal(data) - return fmt.Sprintf("data: %s\n\n", jsonData) + // Multi-line content handler + lines := bytes.Split(formatted, []byte{'\n'}) + for _, line := range lines { + // SSE needs "data: " prefix for each line + fmt.Fprintf(w, "data: %s\n", line) } - // Default comment format - var parts []string - parts = append(parts, "heartbeat") + return nil +} - if h.config.Heartbeat.IncludeTimestamp { - parts = append(parts, time.Now().UTC().Format(time.RFC3339)) - } +func (h *HTTPSink) createHeartbeatEntry() source.LogEntry { + message := "heartbeat" + + // Build fields for heartbeat metadata + fields := make(map[string]any) + fields["type"] = "heartbeat" if h.config.Heartbeat.IncludeStats { - parts = append(parts, fmt.Sprintf("clients=%d", h.activeClients.Load())) - parts = append(parts, fmt.Sprintf("uptime=%ds", int(time.Since(h.startTime).Seconds()))) + fields["active_clients"] = h.activeClients.Load() + fields["uptime_seconds"] = int(time.Since(h.startTime).Seconds()) } - return fmt.Sprintf(": %s\n\n", strings.Join(parts, " ")) + fieldsJSON, _ := json.Marshal(fields) + + return source.LogEntry{ + Time: time.Now(), + Source: "logwisp-http", + Level: "INFO", + Message: message, + Fields: fieldsJSON, + } } func (h *HTTPSink) handleStatus(ctx *fasthttp.RequestCtx) { diff --git a/src/internal/sink/http_client.go b/src/internal/sink/http_client.go index 3dc01d8..9327808 100644 --- a/src/internal/sink/http_client.go +++ b/src/internal/sink/http_client.go @@ -2,14 +2,15 @@ package sink import ( + "bytes" "context" - "encoding/json" "fmt" "net/url" "sync" "sync/atomic" "time" + "logwisp/src/internal/format" "logwisp/src/internal/source" "github.com/lixenwraith/log" @@ -27,6 +28,7 @@ type HTTPClientSink struct { wg sync.WaitGroup startTime time.Time logger *log.Logger + formatter format.Formatter // Statistics totalProcessed atomic.Uint64 @@ -56,7 +58,7 @@ type HTTPClientConfig struct { } // NewHTTPClientSink creates a new HTTP client sink -func NewHTTPClientSink(options map[string]any, logger *log.Logger) (*HTTPClientSink, error) { +func NewHTTPClientSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*HTTPClientSink, error) { cfg := HTTPClientConfig{ BufferSize: 1000, BatchSize: 100, @@ -131,6 +133,7 @@ func NewHTTPClientSink(options map[string]any, logger *log.Logger) (*HTTPClientS done: make(chan struct{}), startTime: time.Now(), logger: logger, + formatter: formatter, } h.lastProcessed.Store(time.Time{}) h.lastBatchSent.Store(time.Time{}) @@ -296,10 +299,33 @@ func (h *HTTPClientSink) sendBatch(batch []source.LogEntry) { h.totalBatches.Add(1) h.lastBatchSent.Store(time.Now()) - // Prepare request body - body, err := json.Marshal(batch) + // Special handling for JSON formatter with batching + var body []byte + var err error + + if jsonFormatter, ok := h.formatter.(*format.JSONFormatter); ok { + // Use the batch formatting method + body, err = jsonFormatter.FormatBatch(batch) + } else { + // For non-JSON formatters, format each entry and combine + var formatted [][]byte + for _, entry := range batch { + entryBytes, err := h.formatter.Format(entry) + if err != nil { + h.logger.Error("msg", "Failed to format entry in batch", + "component", "http_client_sink", + "error", err) + continue + } + formatted = append(formatted, entryBytes) + } + + // For raw/text formats, join with newlines + body = bytes.Join(formatted, nil) + } + if err != nil { - h.logger.Error("msg", "Failed to marshal batch", + h.logger.Error("msg", "Failed to format batch", "component", "http_client_sink", "error", err, "batch_size", len(batch)) @@ -318,10 +344,11 @@ func (h *HTTPClientSink) sendBatch(batch []source.LogEntry) { retryDelay = time.Duration(float64(retryDelay) * h.config.RetryBackoff) } + // TODO: defer placement issue // Create request req := fasthttp.AcquireRequest() - resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseRequest(req) + resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseResponse(resp) req.SetRequestURI(h.config.URL) diff --git a/src/internal/sink/tcp.go b/src/internal/sink/tcp.go index 4816430..8d69825 100644 --- a/src/internal/sink/tcp.go +++ b/src/internal/sink/tcp.go @@ -11,6 +11,7 @@ import ( "time" "logwisp/src/internal/config" + "logwisp/src/internal/format" "logwisp/src/internal/netlimit" "logwisp/src/internal/source" @@ -31,6 +32,7 @@ type TCPSink struct { wg sync.WaitGroup netLimiter *netlimit.Limiter logger *log.Logger + formatter format.Formatter // Statistics totalProcessed atomic.Uint64 @@ -47,7 +49,7 @@ type TCPConfig struct { } // NewTCPSink creates a new TCP streaming sink -func NewTCPSink(options map[string]any, logger *log.Logger) (*TCPSink, error) { +func NewTCPSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*TCPSink, error) { cfg := TCPConfig{ Port: 9090, BufferSize: 1000, @@ -70,8 +72,8 @@ func NewTCPSink(options map[string]any, logger *log.Logger) (*TCPSink, error) { } cfg.Heartbeat.IncludeTimestamp, _ = hb["include_timestamp"].(bool) cfg.Heartbeat.IncludeStats, _ = hb["include_stats"].(bool) - if format, ok := hb["format"].(string); ok { - cfg.Heartbeat.Format = format + if hbFormat, ok := hb["format"].(string); ok { + cfg.Heartbeat.Format = hbFormat } } @@ -108,6 +110,7 @@ func NewTCPSink(options map[string]any, logger *log.Logger) (*TCPSink, error) { done: make(chan struct{}), startTime: time.Now(), logger: logger, + formatter: formatter, } t.lastProcessed.Store(time.Time{}) @@ -233,15 +236,14 @@ func (t *TCPSink) broadcastLoop() { t.totalProcessed.Add(1) t.lastProcessed.Store(time.Now()) - data, err := json.Marshal(entry) + data, err := t.formatter.Format(entry) if err != nil { - t.logger.Error("msg", "Failed to marshal log entry", + t.logger.Error("msg", "Failed to format log entry", "component", "tcp_sink", "error", err, "entry_source", entry.Source) continue } - data = append(data, '\n') t.server.connections.Range(func(key, value any) bool { conn := key.(gnet.Conn) @@ -250,40 +252,49 @@ func (t *TCPSink) broadcastLoop() { }) case <-tickerChan: - if heartbeat := t.formatHeartbeat(); heartbeat != nil { - t.server.connections.Range(func(key, value any) bool { - conn := key.(gnet.Conn) - conn.AsyncWrite(heartbeat, nil) - return true - }) + heartbeatEntry := t.createHeartbeatEntry() + data, err := t.formatter.Format(heartbeatEntry) + if err != nil { + t.logger.Error("msg", "Failed to format heartbeat", + "component", "tcp_sink", + "error", err) + continue } + t.server.connections.Range(func(key, value any) bool { + conn := key.(gnet.Conn) + conn.AsyncWrite(data, nil) + return true + }) + case <-t.done: return } } } -func (t *TCPSink) formatHeartbeat() []byte { - if !t.config.Heartbeat.Enabled { - return nil - } +// Create heartbeat as a proper LogEntry +func (t *TCPSink) createHeartbeatEntry() source.LogEntry { + message := "heartbeat" - data := make(map[string]any) - data["type"] = "heartbeat" - - if t.config.Heartbeat.IncludeTimestamp { - data["time"] = time.Now().UTC().Format(time.RFC3339Nano) - } + // Build fields for heartbeat metadata + fields := make(map[string]any) + fields["type"] = "heartbeat" if t.config.Heartbeat.IncludeStats { - data["active_connections"] = t.activeConns.Load() - data["uptime_seconds"] = int(time.Since(t.startTime).Seconds()) + fields["active_connections"] = t.activeConns.Load() + fields["uptime_seconds"] = int(time.Since(t.startTime).Seconds()) } - // For TCP, always use JSON format - jsonData, _ := json.Marshal(data) - return append(jsonData, '\n') + fieldsJSON, _ := json.Marshal(fields) + + return source.LogEntry{ + Time: time.Now(), + Source: "logwisp-tcp", + Level: "INFO", + Message: message, + Fields: fieldsJSON, + } } // GetActiveConnections returns the current number of connections @@ -371,7 +382,7 @@ func (s *tcpServer) OnTraffic(c gnet.Conn) gnet.Action { return gnet.None } -// noopLogger implements gnet's Logger interface but discards everything +// noopLogger implements gnet Logger interface but discards everything type noopLogger struct{} func (n noopLogger) Debugf(format string, args ...any) {} diff --git a/src/internal/sink/tcp_client.go b/src/internal/sink/tcp_client.go index 26ee669..93bf697 100644 --- a/src/internal/sink/tcp_client.go +++ b/src/internal/sink/tcp_client.go @@ -3,13 +3,14 @@ package sink import ( "context" - "encoding/json" + "errors" "fmt" "net" "sync" "sync/atomic" "time" + "logwisp/src/internal/format" "logwisp/src/internal/source" "github.com/lixenwraith/log" @@ -25,6 +26,7 @@ type TCPClientSink struct { wg sync.WaitGroup startTime time.Time logger *log.Logger + formatter format.Formatter // Reconnection state reconnecting atomic.Bool @@ -54,7 +56,7 @@ type TCPClientConfig struct { } // NewTCPClientSink creates a new TCP client sink -func NewTCPClientSink(options map[string]any, logger *log.Logger) (*TCPClientSink, error) { +func NewTCPClientSink(options map[string]any, logger *log.Logger, formatter format.Formatter) (*TCPClientSink, error) { cfg := TCPClientConfig{ BufferSize: 1000, DialTimeout: 10 * time.Second, @@ -107,6 +109,7 @@ func NewTCPClientSink(options map[string]any, logger *log.Logger) (*TCPClientSin done: make(chan struct{}), startTime: time.Now(), logger: logger, + formatter: formatter, } t.lastProcessed.Store(time.Time{}) t.connectionUptime.Store(time.Duration(0)) @@ -141,7 +144,7 @@ func (t *TCPClientSink) Stop() { // Close connection t.connMu.Lock() if t.conn != nil { - t.conn.Close() + _ = t.conn.Close() } t.connMu.Unlock() @@ -292,12 +295,17 @@ func (t *TCPClientSink) monitorConnection(conn net.Conn) { return case <-ticker.C: // Set read deadline - conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) + // TODO: Add t.config.ReadTimeout instead of static value + if err := conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond)); err != nil { + t.logger.Debug("msg", "Failed to set read deadline", "error", err) + return + } // Try to read (we don't expect any data) _, err := conn.Read(buf) if err != nil { - if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { // Timeout is expected, connection is still alive continue } @@ -347,15 +355,12 @@ func (t *TCPClientSink) sendEntry(entry source.LogEntry) error { return fmt.Errorf("not connected") } - // Marshal to JSON - data, err := json.Marshal(entry) + // Format data + data, err := t.formatter.Format(entry) if err != nil { return fmt.Errorf("failed to marshal entry: %w", err) } - // Add newline - data = append(data, '\n') - // Set write deadline if err := conn.SetWriteDeadline(time.Now().Add(t.config.WriteTimeout)); err != nil { return fmt.Errorf("failed to set write deadline: %w", err) diff --git a/src/internal/source/directory.go b/src/internal/source/directory.go index 6707a52..2b6ece5 100644 --- a/src/internal/source/directory.go +++ b/src/internal/source/directory.go @@ -3,6 +3,7 @@ package source import ( "context" + "errors" "fmt" "os" "path/filepath" @@ -245,7 +246,7 @@ func (ds *DirectorySource) ensureWatcher(path string) { go func() { defer ds.wg.Done() if err := w.watch(ds.ctx); err != nil { - if err == context.Canceled { + if errors.Is(err, context.Canceled) { ds.logger.Debug("msg", "Watcher cancelled", "component", "directory_source", "path", path) diff --git a/src/internal/source/file_watcher.go b/src/internal/source/file_watcher.go index 8412edc..99c97a5 100644 --- a/src/internal/source/file_watcher.go +++ b/src/internal/source/file_watcher.go @@ -279,7 +279,7 @@ func (w *fileWatcher) checkFile() error { // Update position after successful read currentPos, err := file.Seek(0, io.SeekCurrent) if err != nil { - // Log error but don't fail - position tracking is best effort + // Log error but don't fail - best effort position tracking w.logger.Warn("msg", "Failed to get file position", "error", err) // Use size as fallback position currentPos = currentSize