From 069818bf3d093a7c0c39f38a07279fbd191a5009dd611b0c70bdf6a4abc7e2c1 Mon Sep 17 00:00:00 2001 From: Lixen Wraith Date: Mon, 7 Jul 2025 13:08:22 -0400 Subject: [PATCH] v0.1.5 multi-target support, package refactoring --- README.md | 504 ++++++++++++------ config/logwisp.toml | 226 ++++---- src/cmd/logwisp/main.go | 292 +++++----- src/internal/config/auth.go | 56 ++ src/internal/config/config.go | 243 +-------- src/internal/config/loader.go | 102 ++++ src/internal/config/server.go | 62 +++ src/internal/config/ssl.go | 20 + src/internal/config/stream.go | 42 ++ src/internal/config/validation.go | 187 +++++++ src/internal/logstream/httprouter.go | 107 ++++ src/internal/logstream/logstream.go | 124 +++++ src/internal/logstream/routerserver.go | 118 ++++ src/internal/logstream/service.go | 235 ++++++++ src/internal/monitor/file_watcher.go | 186 +++++-- src/internal/monitor/monitor.go | 153 +++++- .../stream/{http.go => httpstreamer.go} | 134 ++++- src/internal/stream/tcpserver.go | 52 ++ .../stream/{tcp.go => tcpstreamer.go} | 42 +- 19 files changed, 2058 insertions(+), 827 deletions(-) create mode 100644 src/internal/config/auth.go create mode 100644 src/internal/config/loader.go create mode 100644 src/internal/config/server.go create mode 100644 src/internal/config/ssl.go create mode 100644 src/internal/config/stream.go create mode 100644 src/internal/config/validation.go create mode 100644 src/internal/logstream/httprouter.go create mode 100644 src/internal/logstream/logstream.go create mode 100644 src/internal/logstream/routerserver.go create mode 100644 src/internal/logstream/service.go rename src/internal/stream/{http.go => httpstreamer.go} (61%) create mode 100644 src/internal/stream/tcpserver.go rename src/internal/stream/{tcp.go => tcpstreamer.go} (77%) diff --git a/README.md b/README.md index 3a09345..2a47d69 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,22 @@ +# LogWisp - Multi-Stream Log Monitoring Service +

LogWisp Logo

-# LogWisp - Dual-Stack Log Streaming - -A high-performance log streaming service with dual-stack architecture: raw TCP streaming via gnet and HTTP/SSE streaming via fasthttp. +A high-performance log streaming service with multi-stream architecture, supporting both TCP and HTTP/SSE protocols with real-time file monitoring and rotation detection. ## Features -- **Dual streaming modes**: TCP (gnet) and HTTP/SSE (fasthttp) -- **Fan-out architecture**: Multiple independent consumers -- **Real-time updates**: File monitoring with rotation detection -- **Zero dependencies**: Only gnet and fasthttp beyond stdlib -- **High performance**: Non-blocking I/O throughout +- **Multi-Stream Architecture**: Run multiple independent log streams, each with its own configuration +- **Dual Protocol Support**: TCP (raw streaming) and HTTP/SSE (browser-friendly) +- **Real-time Monitoring**: Instant updates with configurable check intervals +- **File Rotation Detection**: Automatic detection and handling of log rotation +- **Path-based Routing**: Optional HTTP router for consolidated access +- **Per-Stream Configuration**: Independent settings for each log stream +- **Connection Statistics**: Real-time monitoring of active connections +- **Flexible Targets**: Monitor individual files or entire directories +- **Zero Dependencies**: Only gnet and fasthttp beyond stdlib ## Quick Start @@ -20,200 +24,334 @@ A high-performance log streaming service with dual-stack architecture: raw TCP s # Build go build -o logwisp ./src/cmd/logwisp -# Run with HTTP only (default) +# Run with default configuration ./logwisp -# Enable both TCP and HTTP -./logwisp --enable-tcp --tcp-port 9090 +# Run with custom config +./logwisp --config /etc/logwisp/production.toml -# Monitor specific paths -./logwisp /var/log:*.log /app/logs:error*.log +# Run with HTTP router (path-based routing) +./logwisp --router ``` ## Architecture +LogWisp uses a service-oriented architecture where each stream is an independent pipeline: + ``` -Monitor (Publisher) → [Subscriber Channels] → TCP Server (default port 9090) - ↘ HTTP Server (default port 8080) -``` - -## Command Line Options - -```bash -logwisp [OPTIONS] [TARGET...] - -OPTIONS: - --config FILE Config file path - --check-interval MS File check interval (default: 100) - - # TCP Server - --enable-tcp Enable TCP server - --tcp-port PORT TCP port (default: 9090) - --tcp-buffer-size SIZE TCP buffer size (default: 1000) - - # HTTP Server - --enable-http Enable HTTP server (default: true) - --http-port PORT HTTP port (default: 8080) - --http-buffer-size SIZE HTTP buffer size (default: 1000) - -TARGET: - path[:pattern[:isfile]] Path to monitor - pattern: glob pattern for directories - isfile: true/false (auto-detected if omitted) +LogStream Service +├── Stream["app-logs"] +│ ├── Monitor (watches files) +│ ├── TCP Server (optional) +│ └── HTTP Server (optional) +├── Stream["system-logs"] +│ ├── Monitor +│ └── HTTP Server +└── HTTP Router (optional, for path-based routing) ``` ## Configuration -Config file location: `~/.config/logwisp.toml` +Configuration file location: `~/.config/logwisp.toml` + +### Basic Multi-Stream Configuration ```toml +# Global defaults [monitor] check_interval_ms = 100 -[[monitor.targets]] -path = "./" -pattern = "*.log" -is_file = false +# Application logs stream +[[streams]] +name = "app" -[tcpserver] -enabled = false -port = 9090 -buffer_size = 1000 +[streams.monitor] +targets = [ + { path = "/var/log/myapp", pattern = "*.log", is_file = false }, + { path = "/var/log/myapp/app.log", is_file = true } +] -[httpserver] +[streams.httpserver] enabled = true port = 8080 -buffer_size = 1000 +buffer_size = 2000 +stream_path = "/stream" +status_path = "/status" + +# System logs stream +[[streams]] +name = "system" + +[streams.monitor] +check_interval_ms = 50 # Override global default +targets = [ + { path = "/var/log/syslog", is_file = true }, + { path = "/var/log/auth.log", is_file = true } +] + +[streams.tcpserver] +enabled = true +port = 9090 +buffer_size = 5000 + +[streams.httpserver] +enabled = true +port = 8443 +stream_path = "/logs" +status_path = "/health" ``` -## Clients +### Target Configuration + +Monitor targets support both files and directories: + +```toml +# Directory monitoring with pattern +{ path = "/var/log", pattern = "*.log", is_file = false } + +# Specific file monitoring +{ path = "/var/log/app.log", is_file = true } + +# All .log files in a directory +{ path = "./logs", pattern = "*.log", is_file = false } +``` + +## Usage Modes + +### 1. Standalone Mode (Default) + +Each stream runs on its configured ports: + +```bash +./logwisp +# Stream endpoints: +# - app: http://localhost:8080/stream +# - system: tcp://localhost:9090 and https://localhost:8443/logs +``` + +### 2. Router Mode + +All HTTP streams share ports with path-based routing: + +```bash +./logwisp --router +# Routed endpoints: +# - app: http://localhost:8080/app/stream +# - system: http://localhost:8080/system/logs +# - global: http://localhost:8080/status +``` + +## Client Examples + +### HTTP/SSE Stream + +```bash +# Connect to a stream +curl -N http://localhost:8080/stream + +# Check stream status +curl http://localhost:8080/status + +# With authentication (when implemented) +curl -u admin:password -N https://localhost:8443/logs +``` ### TCP Stream + ```bash -# Simple TCP client +# Using netcat nc localhost 9090 # Using telnet telnet localhost 9090 -# Using socat -socat - TCP:localhost:9090 +# With TLS (when implemented) +openssl s_client -connect localhost:9443 ``` -### HTTP/SSE Stream -```bash -# Stream logs -curl -N http://localhost:8080/stream +### JavaScript Client -# Check status -curl http://localhost:8080/status +```javascript +const eventSource = new EventSource('http://localhost:8080/stream'); + +eventSource.addEventListener('connected', (e) => { + const data = JSON.parse(e.data); + console.log('Connected with ID:', data.client_id); +}); + +eventSource.addEventListener('message', (e) => { + const logEntry = JSON.parse(e.data); + console.log(`[${logEntry.time}] ${logEntry.level}: ${logEntry.message}`); +}); ``` -## Environment Variables - -All config values can be set via environment: -- `LOGWISP_MONITOR_CHECK_INTERVAL_MS` -- `LOGWISP_MONITOR_TARGETS` (format: "path:pattern:isfile,...") -- `LOGWISP_TCPSERVER_ENABLED` -- `LOGWISP_TCPSERVER_PORT` -- `LOGWISP_HTTPSERVER_ENABLED` -- `LOGWISP_HTTPSERVER_PORT` - ## Log Entry Format +All log entries are streamed as JSON: + ```json { "time": "2024-01-01T12:00:00.123456Z", "source": "app.log", - "level": "error", - "message": "Something went wrong", - "fields": {"key": "value"} + "level": "ERROR", + "message": "Connection timeout", + "fields": { + "user_id": "12345", + "request_id": "abc-def-ghi" + } } ``` ## API Endpoints -### TCP Protocol -- Raw JSON lines, one entry per line -- No headers or authentication -- Instant connection, streaming starts immediately +### Stream Endpoints (per stream) -### HTTP Endpoints -- `GET /stream` - SSE stream of log entries -- `GET /status` - Service status JSON +- `GET {stream_path}` - SSE log stream +- `GET {status_path}` - Stream statistics and configuration -### SSE Events -- `connected` - Initial connection with client_id -- `data` - Log entry JSON -- `:` - Heartbeat comment (30s interval) +### Global Endpoints (router mode) -## Heartbeat Configuration +- `GET /status` - Aggregated status for all streams +- `GET /{stream_name}/{path}` - Stream-specific endpoints -LogWisp supports configurable heartbeat messages for both HTTP/SSE and TCP streams to detect stale connections and provide server statistics. +### Status Response -**HTTP/SSE Heartbeat:** -- **Format Options:** - - `comment`: SSE comment format (`: heartbeat ...`) - - `json`: Standard data message with JSON payload -- **Content Options:** - - `include_timestamp`: Add current UTC timestamp - - `include_stats`: Add active clients count and server uptime - -**TCP Heartbeat:** -- Always uses JSON format -- Same content options as HTTP -- Useful for detecting disconnected clients - -**⚠️ SECURITY:** Heartbeat statistics expose minimal server state (connection count, uptime). If this is sensitive in your environment, disable `include_stats`. - -**Example Heartbeat Messages:** - -HTTP Comment format: -``` -: heartbeat 2024-01-01T12:00:00Z clients=5 uptime=3600s -``` - -JSON format: ```json -{"type":"heartbeat","timestamp":"2024-01-01T12:00:00Z","active_clients":5,"uptime_seconds":3600} +{ + "service": "LogWisp", + "version": "3.0.0", + "server": { + "type": "http", + "port": 8080, + "active_clients": 5, + "uptime_seconds": 3600 + }, + "monitor": { + "active_watchers": 3, + "total_entries": 15420, + "dropped_entries": 0 + } +} ``` -**Configuration:** +## Real-time Statistics + +LogWisp provides comprehensive statistics at multiple levels: + +- **Per-Stream Stats**: Monitor performance, connection counts, data throughput +- **Per-Watcher Stats**: File size, position, entries read, rotation count +- **Global Stats**: Aggregated view of all streams (in router mode) + +Access statistics via status endpoints or watch the console output: + +``` +[15:04:05] Active streams: 2 + app: watchers=3 entries=1542 tcp_conns=2 http_conns=5 + system: watchers=2 entries=8901 tcp_conns=0 http_conns=3 +``` + +## Advanced Features + +### File Rotation Detection + +LogWisp automatically detects log rotation through multiple methods: +- Inode change detection +- File size decrease +- Modification time anomalies +- Position beyond file size + +When rotation is detected, a special log entry is generated: +```json +{ + "level": "INFO", + "message": "Log rotation detected (#1): inode change" +} +``` + +### Buffer Management + +- **Non-blocking delivery**: Messages are dropped rather than blocking when buffers fill +- **Per-client buffers**: Each client has independent buffer space +- **Configurable sizes**: Adjust buffer sizes based on expected load + +### Heartbeat Messages + +Keep connections alive and detect stale clients: + ```toml -[httpserver.heartbeat] +[streams.httpserver.heartbeat] enabled = true interval_seconds = 30 include_timestamp = true include_stats = true -format = "json" +format = "json" # or "comment" for SSE comments ``` -**Environment Variables:** -- `LOGWISP_HTTPSERVER_HEARTBEAT_ENABLED` -- `LOGWISP_HTTPSERVER_HEARTBEAT_INTERVAL_SECONDS` -- `LOGWISP_TCPSERVER_HEARTBEAT_ENABLED` -- `LOGWISP_TCPSERVER_HEARTBEAT_INTERVAL_SECONDS` +## Performance Tuning + +### Monitor Settings +- `check_interval_ms`: Lower values = faster detection, higher CPU usage +- `buffer_size`: Larger buffers handle bursts better but use more memory + +### File Watcher Optimization +- Use specific file paths when possible (more efficient than directory scanning) +- Adjust patterns to minimize unnecessary file checks +- Consider separate streams for different update frequencies + +### Network Optimization +- TCP: Best for high-volume, low-latency requirements +- HTTP/SSE: Best for browser compatibility and firewall traversal +- Router mode: Reduces port usage but adds slight routing overhead + +## Building from Source + +```bash +# Clone repository +git clone https://github.com/yourusername/logwisp +cd logwisp + +# Install dependencies +go mod init logwisp +go get github.com/panjf2000/gnet/v2 +go get github.com/valyala/fasthttp +go get github.com/lixenwraith/config + +# Build +go build -o logwisp ./src/cmd/logwisp + +# Run tests +go test ./... +``` ## Deployment ### Systemd Service + ```ini [Unit] -Description=LogWisp Log Streaming +Description=LogWisp Multi-Stream Log Monitor After=network.target [Service] Type=simple -ExecStart=/usr/local/bin/logwisp --enable-tcp --enable-http +ExecStart=/usr/local/bin/logwisp --config /etc/logwisp/production.toml Restart=always -Environment="LOGWISP_TCPSERVER_PORT=9090" -Environment="LOGWISP_HTTPSERVER_PORT=8080" +User=logwisp +Group=logwisp + +# Security hardening +NoNewPrivileges=true +PrivateTmp=true +ProtectSystem=strict +ProtectHome=true +ReadOnlyPaths=/var/log [Install] WantedBy=multi-user.target ``` ### Docker + ```dockerfile FROM golang:1.24 AS builder WORKDIR /app @@ -221,56 +359,88 @@ COPY . . RUN go build -o logwisp ./src/cmd/logwisp FROM debian:bookworm-slim +RUN useradd -r -s /bin/false logwisp COPY --from=builder /app/logwisp /usr/local/bin/ +USER logwisp EXPOSE 8080 9090 -CMD ["logwisp", "--enable-tcp", "--enable-http"] +CMD ["logwisp"] ``` -## Performance Tuning +### Docker Compose -- **Buffer Size**: Increase for burst traffic (5000+) -- **Check Interval**: Decrease for lower latency (10-50ms) -- **TCP**: Best for high-volume system consumers -- **HTTP**: Best for web browsers and REST clients - -### Message Dropping and Client Behavior - -LogWisp uses non-blocking message delivery to maintain system stability. When a client cannot keep up with the log stream, messages are dropped rather than blocking other clients or the monitor. - -**Common causes of dropped messages:** -- **Browser throttling**: Browsers may throttle background tabs, reducing JavaScript execution frequency -- **Network congestion**: Slow connections or high latency can cause client buffers to fill -- **Client processing**: Heavy client-side processing (parsing, rendering) can create backpressure -- **System resources**: CPU/memory constraints on client machines affect consumption rate - -**TCP vs HTTP behavior:** -- **TCP**: Raw stream with kernel-level buffering. Drops occur when TCP send buffer fills -- **HTTP/SSE**: Application-level buffering. Each client has a dedicated channel (default: 1000 entries) - -**Mitigation strategies:** -1. Increase buffer sizes for burst tolerance: `--tcp-buffer-size 5000` or `--http-buffer-size 5000` -2. Implement client-side flow control (pause/resume based on queue depth) -3. Use TCP for high-volume consumers that need guaranteed delivery -4. Keep browser tabs in foreground for real-time monitoring -5. Consider log aggregation/filtering at source for high-volume scenarios - -**Monitoring drops:** -- HTTP: Check `/status` endpoint for drop statistics -- TCP: Monitor connection count and system TCP metrics -- Both: Watch for "channel full" indicators in client implementations - -## Building from Source - -```bash -git clone https://github.com/yourusername/logwisp -cd logwisp -go mod init logwisp -go get github.com/panjf2000/gnet/v2 -go get github.com/valyala/fasthttp -go get github.com/lixenwraith/config -go build -o logwisp ./src/cmd/logwisp +```yaml +version: '3.8' +services: + logwisp: + build: . + volumes: + - /var/log:/var/log:ro + - ./config.toml:/etc/logwisp/config.toml:ro + ports: + - "8080:8080" + - "9090:9090" + restart: unless-stopped + command: ["logwisp", "--config", "/etc/logwisp/config.toml"] ``` +## Security Considerations + +### Current Implementation +- Read-only file access +- No authentication (placeholder configuration only) +- No TLS/SSL support (placeholder configuration only) + +### Planned Security Features +- **Authentication**: Basic, Bearer/JWT, mTLS +- **TLS/SSL**: For both HTTP and TCP streams +- **Rate Limiting**: Per-client request limits +- **IP Filtering**: Whitelist/blacklist support +- **Audit Logging**: Access and authentication events + +### Best Practices +1. Run with minimal privileges (read-only access to log files) +2. Use network-level security until authentication is implemented +3. Place behind a reverse proxy for production HTTPS +4. Monitor access logs for unusual patterns +5. Regularly update dependencies + +## Troubleshooting + +### No Log Entries Appearing +1. Check file permissions (LogWisp needs read access) +2. Verify file paths in configuration +3. Ensure files match the specified patterns +4. Check monitor statistics in status endpoint + +### High Memory Usage +1. Reduce buffer sizes in configuration +2. Lower the number of concurrent watchers +3. Increase check interval for less critical logs +4. Use TCP instead of HTTP for high-volume streams + +### Connection Drops +1. Check heartbeat configuration +2. Verify network stability +3. Monitor client-side errors +4. Review dropped entry statistics + ## License -BSD-3-Clause \ No newline at end of file +BSD-3-Clause + +## Contributing + +Contributions are welcome! Please read our contributing guidelines and submit pull requests to our repository. + +## Roadmap + +- [x] Multi-stream architecture +- [x] File and directory monitoring +- [x] TCP and HTTP/SSE streaming +- [x] Path-based HTTP routing +- [ ] Authentication (Basic, JWT, mTLS) +- [ ] TLS/SSL support +- [ ] Rate limiting +- [ ] Prometheus metrics export +- [ ] WebSocket support +- [ ] Log filtering and transformation \ No newline at end of file diff --git a/config/logwisp.toml b/config/logwisp.toml index aed8653..d8d493f 100644 --- a/config/logwisp.toml +++ b/config/logwisp.toml @@ -1,133 +1,121 @@ -# LogWisp Configuration Template -# Default location: ~/.config/logwisp.toml -# -# Configuration precedence (highest to lowest): -# 1. Command-line arguments -# 2. Environment variables (LOGWISP_ prefix) -# 3. This configuration file -# 4. Built-in defaults +# LogWisp Multi-Stream Configuration +# Location: ~/.config/logwisp.toml +# Global monitor defaults [monitor] -# File check interval (milliseconds) -# Lower = more responsive, higher CPU usage -# Environment: LOGWISP_MONITOR_CHECK_INTERVAL_MS -# CLI: --check-interval MS check_interval_ms = 100 -# Monitor targets -# Environment: LOGWISP_MONITOR_TARGETS="path:pattern:isfile,path2:pattern2:isfile" -# CLI: logwisp [path[:pattern[:isfile]]] ... -[[monitor.targets]] -path = "./" # Directory or file path -pattern = "*.log" # Glob pattern (ignored for files) -is_file = false # true = file, false = directory +# Stream 1: Application logs (public access) +[[streams]] +name = "app" -# # Example: Specific file -# [[monitor.targets]] -# path = "/var/log/app.log" -# pattern = "" -# is_file = true +[streams.monitor] +targets = [ + { path = "/var/log/myapp", pattern = "*.log", is_file = false }, + { path = "/var/log/myapp/app.log", pattern = "", is_file = true } +] -# # Example: System logs -# [[monitor.targets]] -# path = "/var/log" -# pattern = "*.log" -# is_file = false - -[tcpserver] -# Raw TCP streaming server (gnet) -# Environment: LOGWISP_TCPSERVER_ENABLED -# CLI: --enable-tcp -enabled = false - -# TCP port -# Environment: LOGWISP_TCPSERVER_PORT -# CLI: --tcp-port PORT -port = 9090 - -# Per-client buffer size -# Environment: LOGWISP_TCPSERVER_BUFFER_SIZE -# CLI: --tcp-buffer-size SIZE -buffer_size = 1000 - -# TLS/SSL settings (not implemented in PoC) -ssl_enabled = false -ssl_cert_file = "" -ssl_key_file = "" - -[tcpserver.heartbeat] -# Enable/disable heartbeat messages -# Environment: LOGWISP_TCPSERVER_HEARTBEAT_ENABLED -enabled = false - -# Heartbeat interval in seconds -# Environment: LOGWISP_TCPSERVER_HEARTBEAT_INTERVAL_SECONDS -interval_seconds = 30 - -# Include timestamp in heartbeat -# Environment: LOGWISP_TCPSERVER_HEARTBEAT_INCLUDE_TIMESTAMP -include_timestamp = true - -# Include server statistics (active connections, uptime) -# Environment: LOGWISP_TCPSERVER_HEARTBEAT_INCLUDE_STATS -include_stats = false - -# Format: "json" only for TCP -# Environment: LOGWISP_TCPSERVER_HEARTBEAT_FORMAT -format = "json" - -[httpserver] -# HTTP/SSE streaming server (fasthttp) -# Environment: LOGWISP_HTTPSERVER_ENABLED -# CLI: --enable-http +[streams.httpserver] enabled = true - -# HTTP port -# Environment: LOGWISP_HTTPSERVER_PORT -# CLI: --http-port PORT (or legacy --port) port = 8080 +buffer_size = 2000 +stream_path = "/stream" +status_path = "/status" -# Per-client buffer size -# Environment: LOGWISP_HTTPSERVER_BUFFER_SIZE -# CLI: --http-buffer-size SIZE (or legacy --buffer-size) -buffer_size = 1000 - -# TLS/SSL settings (not implemented in PoC) -ssl_enabled = false -ssl_cert_file = "" -ssl_key_file = "" - -[httpserver.heartbeat] -# Enable/disable heartbeat messages -# Environment: LOGWISP_HTTPSERVER_HEARTBEAT_ENABLED +[streams.httpserver.heartbeat] enabled = true - -# Heartbeat interval in seconds -# Environment: LOGWISP_HTTPSERVER_HEARTBEAT_INTERVAL_SECONDS interval_seconds = 30 - -# Include timestamp in heartbeat -# Environment: LOGWISP_HTTPSERVER_HEARTBEAT_INCLUDE_TIMESTAMP -include_timestamp = true - -# Include server statistics (active clients, uptime) -# Environment: LOGWISP_HTTPSERVER_HEARTBEAT_INCLUDE_STATS -include_stats = false - -# Format: "comment" (SSE comment) or "json" (data message) -# Environment: LOGWISP_HTTPSERVER_HEARTBEAT_FORMAT format = "comment" -# Production example: -# [tcpserver] -# enabled = true -# port = 9090 -# buffer_size = 5000 +# Stream 2: System logs (authenticated) +[[streams]] +name = "system" + +[streams.monitor] +check_interval_ms = 50 # More frequent checks +targets = [ + { path = "/var/log", pattern = "syslog*", is_file = false }, + { path = "/var/log/auth.log", pattern = "", is_file = true } +] + +[streams.httpserver] +enabled = true +port = 8443 +buffer_size = 5000 +stream_path = "/logs" +status_path = "/health" + +# SSL placeholder +[streams.httpserver.ssl] +enabled = true +cert_file = "/etc/logwisp/certs/server.crt" +key_file = "/etc/logwisp/certs/server.key" +min_version = "TLS1.2" + +# Authentication placeholder +[streams.auth] +type = "basic" + +[streams.auth.basic_auth] +realm = "System Logs" +users = [ + { username = "admin", password_hash = "$2y$10$..." } +] +ip_whitelist = ["10.0.0.0/8", "192.168.0.0/16"] + +# TCP server also available +[streams.tcpserver] +enabled = true +port = 9443 +buffer_size = 5000 + +[streams.tcpserver.heartbeat] +enabled = false + +# Stream 3: Debug logs (high-volume, no heartbeat) +[[streams]] +name = "debug" + +[streams.monitor] +targets = [ + { path = "./debug", pattern = "*.debug", is_file = false } +] + +[streams.httpserver] +enabled = true +port = 8082 +buffer_size = 10000 +stream_path = "/stream" +status_path = "/status" + +[streams.httpserver.heartbeat] +enabled = false # Disable for high-volume + +# Rate limiting placeholder +[streams.httpserver.rate_limit] +enabled = true +requests_per_second = 100.0 +burst_size = 1000 +limit_by = "ip" + +# Usage Examples: # -# [httpserver] -# enabled = true -# port = 443 -# buffer_size = 5000 -# ssl_enabled = true -# ssl_cert_file = "/etc/ssl/certs/logwisp.crt" -# ssl_key_file = "/etc/ssl/private/logwisp.key" \ No newline at end of file +# 1. Standard mode (each stream on its own port): +# ./logwisp +# - App logs: http://localhost:8080/stream +# - System logs: https://localhost:8443/logs (with auth) +# - Debug logs: http://localhost:8082/stream +# +# 2. Router mode (shared port with path routing): +# ./logwisp --router +# - App logs: http://localhost:8080/app/stream +# - System logs: http://localhost:8080/system/logs +# - Debug logs: http://localhost:8080/debug/stream +# - Global status: http://localhost:8080/status +# +# 3. Override config file: +# ./logwisp --config /etc/logwisp/production.toml +# +# 4. Environment variables: +# LOGWISP_MONITOR_CHECK_INTERVAL_MS=50 +# LOGWISP_STREAMS_0_HTTPSERVER_PORT=8090 \ No newline at end of file diff --git a/src/cmd/logwisp/main.go b/src/cmd/logwisp/main.go index 1185f58..7460015 100644 --- a/src/cmd/logwisp/main.go +++ b/src/cmd/logwisp/main.go @@ -7,27 +7,19 @@ import ( "fmt" "os" "os/signal" - "sync" "syscall" "time" "logwisp/src/internal/config" - "logwisp/src/internal/monitor" - "logwisp/src/internal/stream" + "logwisp/src/internal/logstream" ) func main() { // Parse CLI flags var ( configFile = flag.String("config", "", "Config file path") - // Flags - httpPort = flag.Int("http-port", 0, "HTTP server port") - httpBuffer = flag.Int("http-buffer-size", 0, "HTTP server buffer size") - tcpPort = flag.Int("tcp-port", 0, "TCP server port") - tcpBuffer = flag.Int("tcp-buffer-size", 0, "TCP server buffer size") - enableTCP = flag.Bool("enable-tcp", false, "Enable TCP server") - enableHTTP = flag.Bool("enable-http", false, "Enable HTTP server") - checkInterval = flag.Int("check-interval", 0, "File check interval in ms") + useRouter = flag.Bool("router", false, "Use HTTP router for path-based routing") + // routerPort = flag.Int("router-port", 0, "Override router port (default: first HTTP port)") ) flag.Parse() @@ -35,39 +27,8 @@ func main() { os.Setenv("LOGWISP_CONFIG_FILE", *configFile) } - // Build CLI args for config - var cliArgs []string - - // Flags - if *httpPort > 0 { - cliArgs = append(cliArgs, fmt.Sprintf("--httpserver.port=%d", *httpPort)) - } - if *httpBuffer > 0 { - cliArgs = append(cliArgs, fmt.Sprintf("--httpserver.buffer_size=%d", *httpBuffer)) - } - if *tcpPort > 0 { - cliArgs = append(cliArgs, fmt.Sprintf("--tcpserver.port=%d", *tcpPort)) - } - if *tcpBuffer > 0 { - cliArgs = append(cliArgs, fmt.Sprintf("--tcpserver.buffer_size=%d", *tcpBuffer)) - } - if flag.Lookup("enable-tcp").DefValue != flag.Lookup("enable-tcp").Value.String() { - cliArgs = append(cliArgs, fmt.Sprintf("--tcpserver.enabled=%v", *enableTCP)) - } - if flag.Lookup("enable-http").DefValue != flag.Lookup("enable-http").Value.String() { - cliArgs = append(cliArgs, fmt.Sprintf("--httpserver.enabled=%v", *enableHTTP)) - } - if *checkInterval > 0 { - cliArgs = append(cliArgs, fmt.Sprintf("--monitor.check_interval_ms=%d", *checkInterval)) - } - - // Parse monitor targets from remaining args - for _, arg := range flag.Args() { - cliArgs = append(cliArgs, fmt.Sprintf("--monitor.targets.add=%s", arg)) - } - // Load configuration - cfg, err := config.LoadWithCLI(cliArgs) + cfg, err := config.LoadWithCLI(os.Args[1:]) if err != nil { fmt.Fprintf(os.Stderr, "Failed to load config: %v\n", err) os.Exit(1) @@ -81,147 +42,154 @@ func main() { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - // Create monitor - mon := monitor.New() - mon.SetCheckInterval(time.Duration(cfg.Monitor.CheckIntervalMs) * time.Millisecond) + // Create log stream service + service := logstream.New(ctx) - // Add targets - for _, target := range cfg.Monitor.Targets { - if err := mon.AddTarget(target.Path, target.Pattern, target.IsFile); err != nil { - fmt.Fprintf(os.Stderr, "Failed to add target %s: %v\n", target.Path, err) - } + // Create HTTP router if requested + var router *logstream.HTTPRouter + if *useRouter { + router = logstream.NewHTTPRouter(service) + fmt.Println("HTTP router mode enabled") } - // Start monitor - if err := mon.Start(ctx); err != nil { - fmt.Fprintf(os.Stderr, "Failed to start monitor: %v\n", err) + // Initialize streams + successCount := 0 + for _, streamCfg := range cfg.Streams { + fmt.Printf("Initializing stream '%s'...\n", streamCfg.Name) + + // Set router mode BEFORE creating stream + if *useRouter && streamCfg.HTTPServer != nil && streamCfg.HTTPServer.Enabled { + // Temporarily disable standalone server startup + originalEnabled := streamCfg.HTTPServer.Enabled + streamCfg.HTTPServer.Enabled = false + + if err := service.CreateStream(streamCfg); err != nil { + fmt.Fprintf(os.Stderr, "Failed to create stream '%s': %v\n", streamCfg.Name, err) + continue + } + + // Get the created stream and configure for router mode + stream, _ := service.GetStream(streamCfg.Name) + if stream.HTTPServer != nil { + stream.HTTPServer.SetRouterMode() + // Restore enabled state + stream.Config.HTTPServer.Enabled = originalEnabled + + if err := router.RegisterStream(stream); err != nil { + fmt.Fprintf(os.Stderr, "Failed to register stream '%s' with router: %v\n", + streamCfg.Name, err) + } else { + fmt.Printf("Stream '%s' registered with router\n", streamCfg.Name) + } + } + } else { + // Standard standalone mode + if err := service.CreateStream(streamCfg); err != nil { + fmt.Fprintf(os.Stderr, "Failed to create stream '%s': %v\n", streamCfg.Name, err) + continue + } + } + + successCount++ + + // Display endpoints + displayStreamEndpoints(streamCfg, *useRouter) + } + + if successCount == 0 { + fmt.Fprintln(os.Stderr, "No streams successfully started") os.Exit(1) } - var tcpServer *stream.TCPStreamer - var httpServer *stream.HTTPStreamer + fmt.Printf("\n%d stream(s) running. Press Ctrl+C to stop.\n", successCount) - // Start TCP server if enabled - if cfg.TCPServer.Enabled { - tcpChan := mon.Subscribe() - tcpServer = stream.NewTCPStreamer(tcpChan, cfg.TCPServer) - - // Start TCP server in separate goroutine without blocking wg.Wait() - tcpStarted := make(chan error, 1) - go func() { - tcpStarted <- tcpServer.Start() - }() - - // Check if TCP server started successfully - select { - case err := <-tcpStarted: - if err != nil { - fmt.Fprintf(os.Stderr, "TCP server failed to start: %v\n", err) - os.Exit(1) - } - case <-time.After(1 * time.Second): - // Server is running - } - - fmt.Printf("TCP streaming on port %d\n", cfg.TCPServer.Port) - } - - // Start HTTP server if enabled - if cfg.HTTPServer.Enabled { - httpChan := mon.Subscribe() - httpServer = stream.NewHTTPStreamer(httpChan, cfg.HTTPServer) - - // Start HTTP server in separate goroutine without blocking wg.Wait() - httpStarted := make(chan error, 1) - go func() { - httpStarted <- httpServer.Start() - }() - - // Check if HTTP server started successfully - select { - case err := <-httpStarted: - if err != nil { - fmt.Fprintf(os.Stderr, "HTTP server failed to start: %v\n", err) - os.Exit(1) - } - case <-time.After(1 * time.Second): - // Server is running - } - - fmt.Printf("HTTP/SSE streaming on http://localhost:%d/stream\n", cfg.HTTPServer.Port) - fmt.Printf("Status available at http://localhost:%d/status\n", cfg.HTTPServer.Port) - } - - if !cfg.TCPServer.Enabled && !cfg.HTTPServer.Enabled { - fmt.Fprintln(os.Stderr, "No servers enabled. Enable at least one server in config.") - os.Exit(1) - } + // Start periodic status display + go statusReporter(service) // Wait for shutdown <-sigChan fmt.Println("\nShutting down...") - // Create shutdown group for concurrent server stops - var shutdownWg sync.WaitGroup - - // Stop servers first (concurrently) - if tcpServer != nil { - shutdownWg.Add(1) - go func() { - defer shutdownWg.Done() - tcpServer.Stop() - }() - } - if httpServer != nil { - shutdownWg.Add(1) - go func() { - defer shutdownWg.Done() - httpServer.Stop() - }() + // Shutdown router first if using it + if router != nil { + fmt.Println("Shutting down HTTP router...") + router.Shutdown() } - // Cancel context to stop monitor - cancel() + // Shutdown service (handles all streams) + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer shutdownCancel() - // Wait for servers to stop with timeout - serversDone := make(chan struct{}) + done := make(chan struct{}) go func() { - shutdownWg.Wait() - close(serversDone) + service.Shutdown() + close(done) }() - // Stop monitor after context cancellation - monitorDone := make(chan struct{}) - go func() { - mon.Stop() - close(monitorDone) - }() + select { + case <-done: + fmt.Println("Shutdown complete") + case <-shutdownCtx.Done(): + fmt.Println("Shutdown timeout - forcing exit") + os.Exit(1) + } +} - // Wait for all components with proper timeout - shutdownTimeout := 5 * time.Second - shutdownTimer := time.NewTimer(shutdownTimeout) - defer shutdownTimer.Stop() +func displayStreamEndpoints(cfg config.StreamConfig, routerMode bool) { + if cfg.TCPServer != nil && cfg.TCPServer.Enabled { + fmt.Printf(" TCP: port %d\n", cfg.TCPServer.Port) + } - serversShutdown := false - monitorShutdown := false + if cfg.HTTPServer != nil && cfg.HTTPServer.Enabled { + if routerMode { + fmt.Printf(" HTTP: /%s%s (stream), /%s%s (status)\n", + cfg.Name, cfg.HTTPServer.StreamPath, + cfg.Name, cfg.HTTPServer.StatusPath) + } else { + fmt.Printf(" HTTP: http://localhost:%d%s (stream), http://localhost:%d%s (status)\n", + cfg.HTTPServer.Port, cfg.HTTPServer.StreamPath, + cfg.HTTPServer.Port, cfg.HTTPServer.StatusPath) + } - for !serversShutdown || !monitorShutdown { - select { - case <-serversDone: - serversShutdown = true - case <-monitorDone: - monitorShutdown = true - case <-shutdownTimer.C: - if !serversShutdown { - fmt.Println("Warning: Server shutdown timeout") - } - if !monitorShutdown { - fmt.Println("Warning: Monitor shutdown timeout") - } - fmt.Println("Forcing exit") - os.Exit(1) + if cfg.Auth != nil && cfg.Auth.Type != "none" { + fmt.Printf(" Auth: %s\n", cfg.Auth.Type) } } +} - fmt.Println("Shutdown complete") +func statusReporter(service *logstream.Service) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for range ticker.C { + stats := service.GetGlobalStats() + totalStreams := stats["total_streams"].(int) + if totalStreams == 0 { + return + } + + fmt.Printf("\n[%s] Active streams: %d\n", + time.Now().Format("15:04:05"), totalStreams) + + for name, streamStats := range stats["streams"].(map[string]interface{}) { + s := streamStats.(map[string]interface{}) + fmt.Printf(" %s: ", name) + + if monitor, ok := s["monitor"].(map[string]interface{}); ok { + fmt.Printf("watchers=%d entries=%d ", + monitor["active_watchers"], + monitor["total_entries"]) + } + + if tcp, ok := s["tcp"].(map[string]interface{}); ok && tcp["enabled"].(bool) { + fmt.Printf("tcp_conns=%d ", tcp["connections"]) + } + + if http, ok := s["http"].(map[string]interface{}); ok && http["enabled"].(bool) { + fmt.Printf("http_conns=%d ", http["connections"]) + } + + fmt.Println() + } + } } \ No newline at end of file diff --git a/src/internal/config/auth.go b/src/internal/config/auth.go new file mode 100644 index 0000000..a37ba70 --- /dev/null +++ b/src/internal/config/auth.go @@ -0,0 +1,56 @@ +// FILE: src/internal/config/auth.go +package config + +type AuthConfig struct { + // Authentication type: "none", "basic", "bearer", "mtls" + Type string `toml:"type"` + + // Basic auth + BasicAuth *BasicAuthConfig `toml:"basic_auth"` + + // Bearer token auth + BearerAuth *BearerAuthConfig `toml:"bearer_auth"` + + // IP-based access control + IPWhitelist []string `toml:"ip_whitelist"` + IPBlacklist []string `toml:"ip_blacklist"` +} + +type BasicAuthConfig struct { + // Static users (for simple deployments) + Users []BasicAuthUser `toml:"users"` + + // External auth file + UsersFile string `toml:"users_file"` + + // Realm for WWW-Authenticate header + Realm string `toml:"realm"` +} + +type BasicAuthUser struct { + Username string `toml:"username"` + // Password hash (bcrypt) + PasswordHash string `toml:"password_hash"` +} + +type BearerAuthConfig struct { + // Static tokens + Tokens []string `toml:"tokens"` + + // JWT validation + JWT *JWTConfig `toml:"jwt"` +} + +type JWTConfig struct { + // JWKS URL for key discovery + JWKSURL string `toml:"jwks_url"` + + // Static signing key (if not using JWKS) + SigningKey string `toml:"signing_key"` + + // Expected issuer + Issuer string `toml:"issuer"` + + // Expected audience + Audience string `toml:"audience"` +} \ No newline at end of file diff --git a/src/internal/config/config.go b/src/internal/config/config.go index 03a3ca3..0ab5e19 100644 --- a/src/internal/config/config.go +++ b/src/internal/config/config.go @@ -1,245 +1,14 @@ // FILE: src/internal/config/config.go package config -import ( - "fmt" - "os" - "path/filepath" - "strings" - - lconfig "github.com/lixenwraith/config" -) - type Config struct { - Monitor MonitorConfig `toml:"monitor"` - TCPServer TCPConfig `toml:"tcpserver"` - HTTPServer HTTPConfig `toml:"httpserver"` + // Global monitor settings + Monitor MonitorConfig `toml:"monitor"` + + // Stream configurations + Streams []StreamConfig `toml:"streams"` } type MonitorConfig struct { - CheckIntervalMs int `toml:"check_interval_ms"` - Targets []MonitorTarget `toml:"targets"` -} - -type MonitorTarget struct { - Path string `toml:"path"` - Pattern string `toml:"pattern"` - IsFile bool `toml:"is_file"` -} - -type TCPConfig struct { - Enabled bool `toml:"enabled"` - Port int `toml:"port"` - BufferSize int `toml:"buffer_size"` - SSLEnabled bool `toml:"ssl_enabled"` - SSLCertFile string `toml:"ssl_cert_file"` - SSLKeyFile string `toml:"ssl_key_file"` - Heartbeat HeartbeatConfig `toml:"heartbeat"` -} - -type HTTPConfig struct { - Enabled bool `toml:"enabled"` - Port int `toml:"port"` - BufferSize int `toml:"buffer_size"` - SSLEnabled bool `toml:"ssl_enabled"` - SSLCertFile string `toml:"ssl_cert_file"` - SSLKeyFile string `toml:"ssl_key_file"` - Heartbeat HeartbeatConfig `toml:"heartbeat"` -} - -type HeartbeatConfig struct { - Enabled bool `toml:"enabled"` - IntervalSeconds int `toml:"interval_seconds"` - IncludeTimestamp bool `toml:"include_timestamp"` - IncludeStats bool `toml:"include_stats"` - Format string `toml:"format"` // "comment" or "json" -} - -func defaults() *Config { - return &Config{ - Monitor: MonitorConfig{ - CheckIntervalMs: 100, - Targets: []MonitorTarget{ - {Path: "./", Pattern: "*.log", IsFile: false}, - }, - }, - TCPServer: TCPConfig{ - Enabled: false, - Port: 9090, - BufferSize: 1000, - Heartbeat: HeartbeatConfig{ - Enabled: false, - IntervalSeconds: 30, - IncludeTimestamp: true, - IncludeStats: false, - Format: "json", - }, - }, - HTTPServer: HTTPConfig{ - Enabled: true, - Port: 8080, - BufferSize: 1000, - Heartbeat: HeartbeatConfig{ - Enabled: true, - IntervalSeconds: 30, - IncludeTimestamp: true, - IncludeStats: false, - Format: "comment", - }, - }, - } -} - -func LoadWithCLI(cliArgs []string) (*Config, error) { - configPath := GetConfigPath() - - cfg, err := lconfig.NewBuilder(). - WithDefaults(defaults()). - WithEnvPrefix("LOGWISP_"). - WithFile(configPath). - WithArgs(cliArgs). - WithEnvTransform(customEnvTransform). - WithSources( - lconfig.SourceCLI, - lconfig.SourceEnv, - lconfig.SourceFile, - lconfig.SourceDefault, - ). - Build() - - if err != nil { - if !strings.Contains(err.Error(), "not found") { - return nil, fmt.Errorf("failed to load config: %w", err) - } - } - - if err := handleMonitorTargetsEnv(cfg); err != nil { - return nil, err - } - - finalConfig := &Config{} - if err := cfg.Scan("", finalConfig); err != nil { - return nil, fmt.Errorf("failed to scan config: %w", err) - } - - return finalConfig, finalConfig.validate() -} - -func customEnvTransform(path string) string { - env := strings.ReplaceAll(path, ".", "_") - env = strings.ToUpper(env) - env = "LOGWISP_" + env - return env -} - -func GetConfigPath() string { - if configFile := os.Getenv("LOGWISP_CONFIG_FILE"); configFile != "" { - if filepath.IsAbs(configFile) { - return configFile - } - if configDir := os.Getenv("LOGWISP_CONFIG_DIR"); configDir != "" { - return filepath.Join(configDir, configFile) - } - return configFile - } - - if configDir := os.Getenv("LOGWISP_CONFIG_DIR"); configDir != "" { - return filepath.Join(configDir, "logwisp.toml") - } - - if homeDir, err := os.UserHomeDir(); err == nil { - return filepath.Join(homeDir, ".config", "logwisp.toml") - } - - return "logwisp.toml" -} - -func handleMonitorTargetsEnv(cfg *lconfig.Config) error { - if targetsStr := os.Getenv("LOGWISP_MONITOR_TARGETS"); targetsStr != "" { - cfg.Set("monitor.targets", []MonitorTarget{}) - - parts := strings.Split(targetsStr, ",") - for i, part := range parts { - targetParts := strings.Split(part, ":") - if len(targetParts) >= 1 && targetParts[0] != "" { - path := fmt.Sprintf("monitor.targets.%d.path", i) - cfg.Set(path, targetParts[0]) - - if len(targetParts) >= 2 && targetParts[1] != "" { - pattern := fmt.Sprintf("monitor.targets.%d.pattern", i) - cfg.Set(pattern, targetParts[1]) - } else { - pattern := fmt.Sprintf("monitor.targets.%d.pattern", i) - cfg.Set(pattern, "*.log") - } - - if len(targetParts) >= 3 { - isFile := fmt.Sprintf("monitor.targets.%d.is_file", i) - cfg.Set(isFile, targetParts[2] == "true") - } else { - isFile := fmt.Sprintf("monitor.targets.%d.is_file", i) - cfg.Set(isFile, false) - } - } - } - } - - return nil -} - -func (c *Config) validate() error { - if c.Monitor.CheckIntervalMs < 10 { - return fmt.Errorf("check interval too small: %d ms", c.Monitor.CheckIntervalMs) - } - - if len(c.Monitor.Targets) == 0 { - return fmt.Errorf("no monitor targets specified") - } - - for i, target := range c.Monitor.Targets { - if target.Path == "" { - return fmt.Errorf("target %d: empty path", i) - } - if strings.Contains(target.Path, "..") { - return fmt.Errorf("target %d: path contains directory traversal", i) - } - } - - if c.TCPServer.Enabled { - if c.TCPServer.Port < 1 || c.TCPServer.Port > 65535 { - return fmt.Errorf("invalid TCP port: %d", c.TCPServer.Port) - } - if c.TCPServer.BufferSize < 1 { - return fmt.Errorf("TCP buffer size must be positive: %d", c.TCPServer.BufferSize) - } - } - - if c.HTTPServer.Enabled { - if c.HTTPServer.Port < 1 || c.HTTPServer.Port > 65535 { - return fmt.Errorf("invalid HTTP port: %d", c.HTTPServer.Port) - } - if c.HTTPServer.BufferSize < 1 { - return fmt.Errorf("HTTP buffer size must be positive: %d", c.HTTPServer.BufferSize) - } - } - - if c.TCPServer.Enabled && c.TCPServer.Heartbeat.Enabled { - if c.TCPServer.Heartbeat.IntervalSeconds < 1 { - return fmt.Errorf("TCP heartbeat interval must be positive: %d", c.TCPServer.Heartbeat.IntervalSeconds) - } - if c.TCPServer.Heartbeat.Format != "json" && c.TCPServer.Heartbeat.Format != "comment" { - return fmt.Errorf("TCP heartbeat format must be 'json' or 'comment': %s", c.TCPServer.Heartbeat.Format) - } - } - - if c.HTTPServer.Enabled && c.HTTPServer.Heartbeat.Enabled { - if c.HTTPServer.Heartbeat.IntervalSeconds < 1 { - return fmt.Errorf("HTTP heartbeat interval must be positive: %d", c.HTTPServer.Heartbeat.IntervalSeconds) - } - if c.HTTPServer.Heartbeat.Format != "json" && c.HTTPServer.Heartbeat.Format != "comment" { - return fmt.Errorf("HTTP heartbeat format must be 'json' or 'comment': %s", c.HTTPServer.Heartbeat.Format) - } - } - - return nil + CheckIntervalMs int `toml:"check_interval_ms"` } \ No newline at end of file diff --git a/src/internal/config/loader.go b/src/internal/config/loader.go new file mode 100644 index 0000000..fee2248 --- /dev/null +++ b/src/internal/config/loader.go @@ -0,0 +1,102 @@ +// FILE: src/internal/config/loader.go +package config + +import ( + "fmt" + lconfig "github.com/lixenwraith/config" + "os" + "path/filepath" + "strings" +) + +func defaults() *Config { + return &Config{ + Monitor: MonitorConfig{ + CheckIntervalMs: 100, + }, + Streams: []StreamConfig{ + { + Name: "default", + Monitor: &StreamMonitorConfig{ + Targets: []MonitorTarget{ + {Path: "./", Pattern: "*.log", IsFile: false}, + }, + }, + HTTPServer: &HTTPConfig{ + Enabled: true, + Port: 8080, + BufferSize: 1000, + StreamPath: "/stream", + StatusPath: "/status", + Heartbeat: HeartbeatConfig{ + Enabled: true, + IntervalSeconds: 30, + IncludeTimestamp: true, + IncludeStats: false, + Format: "comment", + }, + }, + }, + }, + } +} + +func LoadWithCLI(cliArgs []string) (*Config, error) { + configPath := GetConfigPath() + + cfg, err := lconfig.NewBuilder(). + WithDefaults(defaults()). + WithEnvPrefix("LOGWISP_"). + WithFile(configPath). + WithArgs(cliArgs). + WithEnvTransform(customEnvTransform). + WithSources( + lconfig.SourceCLI, + lconfig.SourceEnv, + lconfig.SourceFile, + lconfig.SourceDefault, + ). + Build() + + if err != nil { + if !strings.Contains(err.Error(), "not found") { + return nil, fmt.Errorf("failed to load config: %w", err) + } + } + + finalConfig := &Config{} + if err := cfg.Scan("", finalConfig); err != nil { + return nil, fmt.Errorf("failed to scan config: %w", err) + } + + return finalConfig, finalConfig.validate() +} + +func customEnvTransform(path string) string { + env := strings.ReplaceAll(path, ".", "_") + env = strings.ToUpper(env) + env = "LOGWISP_" + env + return env +} + +func GetConfigPath() string { + if configFile := os.Getenv("LOGWISP_CONFIG_FILE"); configFile != "" { + if filepath.IsAbs(configFile) { + return configFile + } + if configDir := os.Getenv("LOGWISP_CONFIG_DIR"); configDir != "" { + return filepath.Join(configDir, configFile) + } + return configFile + } + + if configDir := os.Getenv("LOGWISP_CONFIG_DIR"); configDir != "" { + return filepath.Join(configDir, "logwisp.toml") + } + + if homeDir, err := os.UserHomeDir(); err == nil { + return filepath.Join(homeDir, ".config", "logwisp.toml") + } + + return "logwisp.toml" +} \ No newline at end of file diff --git a/src/internal/config/server.go b/src/internal/config/server.go new file mode 100644 index 0000000..1d0b435 --- /dev/null +++ b/src/internal/config/server.go @@ -0,0 +1,62 @@ +// FILE: src/internal/config/server.go +package config + +type TCPConfig struct { + Enabled bool `toml:"enabled"` + Port int `toml:"port"` + BufferSize int `toml:"buffer_size"` + + // SSL/TLS Configuration + SSL *SSLConfig `toml:"ssl"` + + // Rate limiting + RateLimit *RateLimitConfig `toml:"rate_limit"` + + // Heartbeat + Heartbeat HeartbeatConfig `toml:"heartbeat"` +} + +type HTTPConfig struct { + Enabled bool `toml:"enabled"` + Port int `toml:"port"` + BufferSize int `toml:"buffer_size"` + + // Endpoint paths + StreamPath string `toml:"stream_path"` + StatusPath string `toml:"status_path"` + + // SSL/TLS Configuration + SSL *SSLConfig `toml:"ssl"` + + // Rate limiting + RateLimit *RateLimitConfig `toml:"rate_limit"` + + // Heartbeat + Heartbeat HeartbeatConfig `toml:"heartbeat"` +} + +type HeartbeatConfig struct { + Enabled bool `toml:"enabled"` + IntervalSeconds int `toml:"interval_seconds"` + IncludeTimestamp bool `toml:"include_timestamp"` + IncludeStats bool `toml:"include_stats"` + Format string `toml:"format"` // "comment" or "json" +} + +type RateLimitConfig struct { + // Enable rate limiting + Enabled bool `toml:"enabled"` + + // Requests per second per client + RequestsPerSecond float64 `toml:"requests_per_second"` + + // Burst size (token bucket) + BurstSize int `toml:"burst_size"` + + // Rate limit by: "ip", "user", "token" + LimitBy string `toml:"limit_by"` + + // Response when rate limited + ResponseCode int `toml:"response_code"` // Default: 429 + ResponseMessage string `toml:"response_message"` // Default: "Rate limit exceeded" +} \ No newline at end of file diff --git a/src/internal/config/ssl.go b/src/internal/config/ssl.go new file mode 100644 index 0000000..81af516 --- /dev/null +++ b/src/internal/config/ssl.go @@ -0,0 +1,20 @@ +// FILE: src/internal/config/ssl.go +package config + +type SSLConfig struct { + Enabled bool `toml:"enabled"` + CertFile string `toml:"cert_file"` + KeyFile string `toml:"key_file"` + + // Client certificate authentication + ClientAuth bool `toml:"client_auth"` + ClientCAFile string `toml:"client_ca_file"` + VerifyClientCert bool `toml:"verify_client_cert"` + + // TLS version constraints + MinVersion string `toml:"min_version"` // "TLS1.2", "TLS1.3" + MaxVersion string `toml:"max_version"` + + // Cipher suites (comma-separated list) + CipherSuites string `toml:"cipher_suites"` +} \ No newline at end of file diff --git a/src/internal/config/stream.go b/src/internal/config/stream.go new file mode 100644 index 0000000..2fc026e --- /dev/null +++ b/src/internal/config/stream.go @@ -0,0 +1,42 @@ +// FILE: src/internal/config/stream.go +package config + +type StreamConfig struct { + // Stream identifier (used in logs and metrics) + Name string `toml:"name"` + + // Monitor configuration for this stream + Monitor *StreamMonitorConfig `toml:"monitor"` + + // Server configurations + TCPServer *TCPConfig `toml:"tcpserver"` + HTTPServer *HTTPConfig `toml:"httpserver"` + + // Authentication/Authorization + Auth *AuthConfig `toml:"auth"` +} + +type StreamMonitorConfig struct { + CheckIntervalMs *int `toml:"check_interval_ms"` + Targets []MonitorTarget `toml:"targets"` +} + +type MonitorTarget struct { + Path string `toml:"path"` + Pattern string `toml:"pattern"` + IsFile bool `toml:"is_file"` +} + +func (s *StreamConfig) GetTargets(defaultTargets []MonitorTarget) []MonitorTarget { + if s.Monitor != nil && len(s.Monitor.Targets) > 0 { + return s.Monitor.Targets + } + return nil +} + +func (s *StreamConfig) GetCheckInterval(defaultInterval int) int { + if s.Monitor != nil && s.Monitor.CheckIntervalMs != nil { + return *s.Monitor.CheckIntervalMs + } + return defaultInterval +} \ No newline at end of file diff --git a/src/internal/config/validation.go b/src/internal/config/validation.go new file mode 100644 index 0000000..8fe8c26 --- /dev/null +++ b/src/internal/config/validation.go @@ -0,0 +1,187 @@ +// FILE: src/internal/config/validation.go +package config + +import ( + "fmt" + "strings" +) + +func (c *Config) validate() error { + if c.Monitor.CheckIntervalMs < 10 { + return fmt.Errorf("check interval too small: %d ms", c.Monitor.CheckIntervalMs) + } + + if len(c.Streams) == 0 { + return fmt.Errorf("no streams configured") + } + + // Validate each stream + streamNames := make(map[string]bool) + streamPorts := make(map[int]string) + + for i, stream := range c.Streams { + if stream.Name == "" { + return fmt.Errorf("stream %d: missing name", i) + } + + if streamNames[stream.Name] { + return fmt.Errorf("stream %d: duplicate name '%s'", i, stream.Name) + } + streamNames[stream.Name] = true + + // Stream must have targets + if stream.Monitor == nil || len(stream.Monitor.Targets) == 0 { + return fmt.Errorf("stream '%s': no monitor targets specified", stream.Name) + } + + for j, target := range stream.Monitor.Targets { + if target.Path == "" { + return fmt.Errorf("stream '%s' target %d: empty path", stream.Name, j) + } + if strings.Contains(target.Path, "..") { + return fmt.Errorf("stream '%s' target %d: path contains directory traversal", stream.Name, j) + } + } + + // Validate TCP server + if stream.TCPServer != nil && stream.TCPServer.Enabled { + if stream.TCPServer.Port < 1 || stream.TCPServer.Port > 65535 { + return fmt.Errorf("stream '%s': invalid TCP port: %d", stream.Name, stream.TCPServer.Port) + } + if existing, exists := streamPorts[stream.TCPServer.Port]; exists { + return fmt.Errorf("stream '%s': TCP port %d already used by stream '%s'", + stream.Name, stream.TCPServer.Port, existing) + } + streamPorts[stream.TCPServer.Port] = stream.Name + "-tcp" + + if stream.TCPServer.BufferSize < 1 { + return fmt.Errorf("stream '%s': TCP buffer size must be positive: %d", + stream.Name, stream.TCPServer.BufferSize) + } + + if err := validateHeartbeat("TCP", stream.Name, &stream.TCPServer.Heartbeat); err != nil { + return err + } + + if err := validateSSL("TCP", stream.Name, stream.TCPServer.SSL); err != nil { + return err + } + } + + // Validate HTTP server + if stream.HTTPServer != nil && stream.HTTPServer.Enabled { + if stream.HTTPServer.Port < 1 || stream.HTTPServer.Port > 65535 { + return fmt.Errorf("stream '%s': invalid HTTP port: %d", stream.Name, stream.HTTPServer.Port) + } + if existing, exists := streamPorts[stream.HTTPServer.Port]; exists { + return fmt.Errorf("stream '%s': HTTP port %d already used by stream '%s'", + stream.Name, stream.HTTPServer.Port, existing) + } + streamPorts[stream.HTTPServer.Port] = stream.Name + "-http" + + if stream.HTTPServer.BufferSize < 1 { + return fmt.Errorf("stream '%s': HTTP buffer size must be positive: %d", + stream.Name, stream.HTTPServer.BufferSize) + } + + // Validate paths + if stream.HTTPServer.StreamPath == "" { + stream.HTTPServer.StreamPath = "/stream" + } + if stream.HTTPServer.StatusPath == "" { + stream.HTTPServer.StatusPath = "/status" + } + if !strings.HasPrefix(stream.HTTPServer.StreamPath, "/") { + return fmt.Errorf("stream '%s': stream path must start with /: %s", + stream.Name, stream.HTTPServer.StreamPath) + } + if !strings.HasPrefix(stream.HTTPServer.StatusPath, "/") { + return fmt.Errorf("stream '%s': status path must start with /: %s", + stream.Name, stream.HTTPServer.StatusPath) + } + + if err := validateHeartbeat("HTTP", stream.Name, &stream.HTTPServer.Heartbeat); err != nil { + return err + } + + if err := validateSSL("HTTP", stream.Name, stream.HTTPServer.SSL); err != nil { + return err + } + } + + // At least one server must be enabled + tcpEnabled := stream.TCPServer != nil && stream.TCPServer.Enabled + httpEnabled := stream.HTTPServer != nil && stream.HTTPServer.Enabled + if !tcpEnabled && !httpEnabled { + return fmt.Errorf("stream '%s': no servers enabled", stream.Name) + } + + // Validate auth if present + if err := validateAuth(stream.Name, stream.Auth); err != nil { + return err + } + } + + return nil +} + +func validateHeartbeat(serverType, streamName string, hb *HeartbeatConfig) error { + if hb.Enabled { + if hb.IntervalSeconds < 1 { + return fmt.Errorf("stream '%s' %s: heartbeat interval must be positive: %d", + streamName, serverType, hb.IntervalSeconds) + } + if hb.Format != "json" && hb.Format != "comment" { + return fmt.Errorf("stream '%s' %s: heartbeat format must be 'json' or 'comment': %s", + streamName, serverType, hb.Format) + } + } + return nil +} + +func validateSSL(serverType, streamName string, ssl *SSLConfig) error { + if ssl != nil && ssl.Enabled { + if ssl.CertFile == "" || ssl.KeyFile == "" { + return fmt.Errorf("stream '%s' %s: SSL enabled but cert/key files not specified", + streamName, serverType) + } + + if ssl.ClientAuth && ssl.ClientCAFile == "" { + return fmt.Errorf("stream '%s' %s: client auth enabled but CA file not specified", + streamName, serverType) + } + + // Validate TLS versions + validVersions := map[string]bool{"TLS1.0": true, "TLS1.1": true, "TLS1.2": true, "TLS1.3": true} + if ssl.MinVersion != "" && !validVersions[ssl.MinVersion] { + return fmt.Errorf("stream '%s' %s: invalid min TLS version: %s", + streamName, serverType, ssl.MinVersion) + } + if ssl.MaxVersion != "" && !validVersions[ssl.MaxVersion] { + return fmt.Errorf("stream '%s' %s: invalid max TLS version: %s", + streamName, serverType, ssl.MaxVersion) + } + } + return nil +} + +func validateAuth(streamName string, auth *AuthConfig) error { + if auth == nil { + return nil + } + + validTypes := map[string]bool{"none": true, "basic": true, "bearer": true, "mtls": true} + if !validTypes[auth.Type] { + return fmt.Errorf("stream '%s': invalid auth type: %s", streamName, auth.Type) + } + + if auth.Type == "basic" && auth.BasicAuth == nil { + return fmt.Errorf("stream '%s': basic auth type specified but config missing", streamName) + } + + if auth.Type == "bearer" && auth.BearerAuth == nil { + return fmt.Errorf("stream '%s': bearer auth type specified but config missing", streamName) + } + + return nil +} \ No newline at end of file diff --git a/src/internal/logstream/httprouter.go b/src/internal/logstream/httprouter.go new file mode 100644 index 0000000..3b02324 --- /dev/null +++ b/src/internal/logstream/httprouter.go @@ -0,0 +1,107 @@ +// FILE: src/internal/logstream/httprouter.go +package logstream + +import ( + "fmt" + "strings" + "sync" + + "github.com/valyala/fasthttp" +) + +type HTTPRouter struct { + service *Service + servers map[int]*routerServer // port -> server + mu sync.RWMutex +} + +func NewHTTPRouter(service *Service) *HTTPRouter { + return &HTTPRouter{ + service: service, + servers: make(map[int]*routerServer), + } +} + +func (r *HTTPRouter) RegisterStream(stream *LogStream) error { + if stream.HTTPServer == nil || stream.Config.HTTPServer == nil { + return nil // No HTTP server configured + } + + port := stream.Config.HTTPServer.Port + + r.mu.Lock() + rs, exists := r.servers[port] + if !exists { + // Create new server for this port + rs = &routerServer{ + port: port, + routes: make(map[string]*LogStream), + } + rs.server = &fasthttp.Server{ + Handler: rs.requestHandler, + DisableKeepalive: false, + StreamRequestBody: true, + } + r.servers[port] = rs + + // Start server in background + go func() { + addr := fmt.Sprintf(":%d", port) + if err := rs.server.ListenAndServe(addr); err != nil { + // Log error but don't crash + fmt.Printf("Router server on port %d failed: %v\n", port, err) + } + }() + } + r.mu.Unlock() + + // Register routes for this stream + rs.routeMu.Lock() + defer rs.routeMu.Unlock() + + // Use stream name as path prefix + pathPrefix := "/" + stream.Name + + // Check for conflicts + for existingPath, existingStream := range rs.routes { + if strings.HasPrefix(pathPrefix, existingPath) || strings.HasPrefix(existingPath, pathPrefix) { + return fmt.Errorf("path conflict: '%s' conflicts with existing stream '%s' at '%s'", + pathPrefix, existingStream.Name, existingPath) + } + } + + rs.routes[pathPrefix] = stream + return nil +} + +func (r *HTTPRouter) UnregisterStream(streamName string) { + r.mu.RLock() + defer r.mu.RUnlock() + + for _, rs := range r.servers { + rs.routeMu.Lock() + for path, stream := range rs.routes { + if stream.Name == streamName { + delete(rs.routes, path) + } + } + rs.routeMu.Unlock() + } +} + +func (r *HTTPRouter) Shutdown() { + r.mu.Lock() + defer r.mu.Unlock() + + var wg sync.WaitGroup + for port, rs := range r.servers { + wg.Add(1) + go func(p int, s *routerServer) { + defer wg.Done() + if err := s.server.Shutdown(); err != nil { + fmt.Printf("Error shutting down router server on port %d: %v\n", p, err) + } + }(port, rs) + } + wg.Wait() +} \ No newline at end of file diff --git a/src/internal/logstream/logstream.go b/src/internal/logstream/logstream.go new file mode 100644 index 0000000..2ce666d --- /dev/null +++ b/src/internal/logstream/logstream.go @@ -0,0 +1,124 @@ +// FILE: src/internal/logstream/logstream.go +package logstream + +import ( + "context" + "fmt" + "logwisp/src/internal/config" + "sync" + "time" +) + +func (ls *LogStream) Shutdown() { + // Stop servers first + var wg sync.WaitGroup + + if ls.TCPServer != nil { + wg.Add(1) + go func() { + defer wg.Done() + ls.TCPServer.Stop() + }() + } + + if ls.HTTPServer != nil { + wg.Add(1) + go func() { + defer wg.Done() + ls.HTTPServer.Stop() + }() + } + + // Cancel context + ls.cancel() + + // Wait for servers + wg.Wait() + + // Stop monitor + ls.Monitor.Stop() +} + +func (ls *LogStream) GetStats() map[string]interface{} { + monStats := ls.Monitor.GetStats() + + stats := map[string]interface{}{ + "name": ls.Name, + "uptime_seconds": int(time.Since(ls.Stats.StartTime).Seconds()), + "monitor": monStats, + } + + if ls.TCPServer != nil { + currentConnections := ls.TCPServer.GetActiveConnections() + + stats["tcp"] = map[string]interface{}{ + "enabled": true, + "port": ls.Config.TCPServer.Port, + "connections": currentConnections, // Use current value + } + } + + if ls.HTTPServer != nil { + currentConnections := ls.HTTPServer.GetActiveConnections() + + stats["http"] = map[string]interface{}{ + "enabled": true, + "port": ls.Config.HTTPServer.Port, + "connections": currentConnections, // Use current value + "stream_path": ls.Config.HTTPServer.StreamPath, + "status_path": ls.Config.HTTPServer.StatusPath, + } + } + + return stats +} + +func (ls *LogStream) UpdateTargets(targets []config.MonitorTarget) error { + // Clear existing targets + for _, watcher := range ls.Monitor.GetActiveWatchers() { + ls.Monitor.RemoveTarget(watcher.Path) + } + + // Add new targets + for _, target := range targets { + if err := ls.Monitor.AddTarget(target.Path, target.Pattern, target.IsFile); err != nil { + return err + } + } + + return nil +} + +func (ls *LogStream) startStatsUpdater(ctx context.Context) { + go func() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // Update cached values + if ls.TCPServer != nil { + oldTCP := ls.Stats.TCPConnections + ls.Stats.TCPConnections = ls.TCPServer.GetActiveConnections() + if oldTCP != ls.Stats.TCPConnections { + // This debug should now show changes + fmt.Printf("[STATS DEBUG] %s TCP: %d -> %d\n", + ls.Name, oldTCP, ls.Stats.TCPConnections) + } + } + if ls.HTTPServer != nil { + oldHTTP := ls.Stats.HTTPConnections + ls.Stats.HTTPConnections = ls.HTTPServer.GetActiveConnections() + if oldHTTP != ls.Stats.HTTPConnections { + // This debug should now show changes + fmt.Printf("[STATS DEBUG] %s HTTP: %d -> %d\n", + ls.Name, oldHTTP, ls.Stats.HTTPConnections) + } + } + } + } + }() +} \ No newline at end of file diff --git a/src/internal/logstream/routerserver.go b/src/internal/logstream/routerserver.go new file mode 100644 index 0000000..c8f31c6 --- /dev/null +++ b/src/internal/logstream/routerserver.go @@ -0,0 +1,118 @@ +// FILE: src/internal/config/routerserver.go +package logstream + +import ( + "encoding/json" + "fmt" + "github.com/valyala/fasthttp" + "strings" + "sync" +) + +type routerServer struct { + port int + server *fasthttp.Server + routes map[string]*LogStream // path prefix -> stream + routeMu sync.RWMutex +} + +func (rs *routerServer) requestHandler(ctx *fasthttp.RequestCtx) { + path := string(ctx.Path()) + + // Special case: global status at /status + if path == "/status" { + rs.handleGlobalStatus(ctx) + return + } + + // Find matching stream + rs.routeMu.RLock() + var matchedStream *LogStream + var matchedPrefix string + var remainingPath string + + for prefix, stream := range rs.routes { + if strings.HasPrefix(path, prefix) { + // Use longest prefix match + if len(prefix) > len(matchedPrefix) { + matchedPrefix = prefix + matchedStream = stream + remainingPath = strings.TrimPrefix(path, prefix) + } + } + } + rs.routeMu.RUnlock() + + if matchedStream == nil { + rs.handleNotFound(ctx) + return + } + + // Route to stream's handler + if matchedStream.HTTPServer != nil { + // Rewrite path to remove stream prefix + ctx.URI().SetPath(remainingPath) + matchedStream.HTTPServer.RouteRequest(ctx) + } else { + ctx.SetStatusCode(fasthttp.StatusServiceUnavailable) + ctx.SetContentType("application/json") + json.NewEncoder(ctx).Encode(map[string]string{ + "error": "Stream HTTP server not available", + }) + } +} + +func (rs *routerServer) handleGlobalStatus(ctx *fasthttp.RequestCtx) { + ctx.SetContentType("application/json") + + rs.routeMu.RLock() + streams := make(map[string]interface{}) + for prefix, stream := range rs.routes { + streams[stream.Name] = map[string]interface{}{ + "path_prefix": prefix, + "config": map[string]interface{}{ + "stream_path": stream.Config.HTTPServer.StreamPath, + "status_path": stream.Config.HTTPServer.StatusPath, + }, + "stats": stream.GetStats(), + } + } + rs.routeMu.RUnlock() + + status := map[string]interface{}{ + "service": "LogWisp Router", + "port": rs.port, + "streams": streams, + "total_streams": len(streams), + } + + data, _ := json.Marshal(status) + ctx.SetBody(data) +} + +func (rs *routerServer) handleNotFound(ctx *fasthttp.RequestCtx) { + ctx.SetStatusCode(fasthttp.StatusNotFound) + ctx.SetContentType("application/json") + + rs.routeMu.RLock() + availableRoutes := make([]string, 0, len(rs.routes)*2+1) + availableRoutes = append(availableRoutes, "/status (global status)") + + for prefix, stream := range rs.routes { + if stream.Config.HTTPServer != nil { + availableRoutes = append(availableRoutes, + fmt.Sprintf("%s%s (stream: %s)", prefix, stream.Config.HTTPServer.StreamPath, stream.Name), + fmt.Sprintf("%s%s (status: %s)", prefix, stream.Config.HTTPServer.StatusPath, stream.Name), + ) + } + } + rs.routeMu.RUnlock() + + response := map[string]interface{}{ + "error": "Not Found", + "available_routes": availableRoutes, + } + + data, _ := json.Marshal(response) + ctx.SetBody(data) +} \ No newline at end of file diff --git a/src/internal/logstream/service.go b/src/internal/logstream/service.go new file mode 100644 index 0000000..e488661 --- /dev/null +++ b/src/internal/logstream/service.go @@ -0,0 +1,235 @@ +// FILE: src/internal/logstream/service.go +package logstream + +import ( + "context" + "fmt" + "sync" + "time" + + "logwisp/src/internal/config" + "logwisp/src/internal/monitor" + "logwisp/src/internal/stream" +) + +type Service struct { + streams map[string]*LogStream + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +type LogStream struct { + Name string + Config config.StreamConfig + Monitor monitor.Monitor + TCPServer *stream.TCPStreamer + HTTPServer *stream.HTTPStreamer + Stats *StreamStats + + ctx context.Context + cancel context.CancelFunc +} + +type StreamStats struct { + StartTime time.Time + MonitorStats monitor.Stats + TCPConnections int32 + HTTPConnections int32 + TotalBytesServed uint64 + TotalEntriesServed uint64 +} + +func New(ctx context.Context) *Service { + serviceCtx, cancel := context.WithCancel(ctx) + return &Service{ + streams: make(map[string]*LogStream), + ctx: serviceCtx, + cancel: cancel, + } +} + +func (s *Service) CreateStream(cfg config.StreamConfig) error { + s.mu.Lock() + defer s.mu.Unlock() + + if _, exists := s.streams[cfg.Name]; exists { + return fmt.Errorf("stream '%s' already exists", cfg.Name) + } + + // Create stream context + streamCtx, streamCancel := context.WithCancel(s.ctx) + + // Create monitor + mon := monitor.New() + mon.SetCheckInterval(time.Duration(cfg.GetCheckInterval(100)) * time.Millisecond) + + // Add targets + for _, target := range cfg.GetTargets(nil) { + if err := mon.AddTarget(target.Path, target.Pattern, target.IsFile); err != nil { + streamCancel() + return fmt.Errorf("failed to add target %s: %w", target.Path, err) + } + } + + // Start monitor + if err := mon.Start(streamCtx); err != nil { + streamCancel() + return fmt.Errorf("failed to start monitor: %w", err) + } + + // Create log stream + ls := &LogStream{ + Name: cfg.Name, + Config: cfg, + Monitor: mon, + Stats: &StreamStats{ + StartTime: time.Now(), + }, + ctx: streamCtx, + cancel: streamCancel, + } + + // Start TCP server if configured + if cfg.TCPServer != nil && cfg.TCPServer.Enabled { + tcpChan := mon.Subscribe() + ls.TCPServer = stream.NewTCPStreamer(tcpChan, *cfg.TCPServer) + + if err := s.startTCPServer(ls); err != nil { + ls.Shutdown() + return fmt.Errorf("TCP server failed: %w", err) + } + } + + // Start HTTP server if configured + if cfg.HTTPServer != nil && cfg.HTTPServer.Enabled { + httpChan := mon.Subscribe() + ls.HTTPServer = stream.NewHTTPStreamer(httpChan, *cfg.HTTPServer) + + if err := s.startHTTPServer(ls); err != nil { + ls.Shutdown() + return fmt.Errorf("HTTP server failed: %w", err) + } + } + + ls.startStatsUpdater(streamCtx) + + s.streams[cfg.Name] = ls + return nil +} + +func (s *Service) GetStream(name string) (*LogStream, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + stream, exists := s.streams[name] + if !exists { + return nil, fmt.Errorf("stream '%s' not found", name) + } + return stream, nil +} + +func (s *Service) ListStreams() []string { + s.mu.RLock() + defer s.mu.RUnlock() + + names := make([]string, 0, len(s.streams)) + for name := range s.streams { + names = append(names, name) + } + return names +} + +func (s *Service) RemoveStream(name string) error { + s.mu.Lock() + defer s.mu.Unlock() + + stream, exists := s.streams[name] + if !exists { + return fmt.Errorf("stream '%s' not found", name) + } + + stream.Shutdown() + delete(s.streams, name) + return nil +} + +func (s *Service) Shutdown() { + s.mu.Lock() + streams := make([]*LogStream, 0, len(s.streams)) + for _, stream := range s.streams { + streams = append(streams, stream) + } + s.mu.Unlock() + + // Stop all streams concurrently + var wg sync.WaitGroup + for _, stream := range streams { + wg.Add(1) + go func(ls *LogStream) { + defer wg.Done() + ls.Shutdown() + }(stream) + } + wg.Wait() + + s.cancel() + s.wg.Wait() +} + +func (s *Service) GetGlobalStats() map[string]interface{} { + s.mu.RLock() + defer s.mu.RUnlock() + + stats := map[string]interface{}{ + "streams": make(map[string]interface{}), + "total_streams": len(s.streams), + } + + for name, stream := range s.streams { + stats["streams"].(map[string]interface{})[name] = stream.GetStats() + } + + return stats +} + +func (s *Service) startTCPServer(ls *LogStream) error { + errChan := make(chan error, 1) + s.wg.Add(1) + + go func() { + defer s.wg.Done() + if err := ls.TCPServer.Start(); err != nil { + errChan <- err + } + }() + + // Check startup + select { + case err := <-errChan: + return err + case <-time.After(time.Second): + return nil + } +} + +func (s *Service) startHTTPServer(ls *LogStream) error { + errChan := make(chan error, 1) + s.wg.Add(1) + + go func() { + defer s.wg.Done() + if err := ls.HTTPServer.Start(); err != nil { + errChan <- err + } + }() + + // Check startup + select { + case err := <-errChan: + return err + case <-time.After(time.Second): + return nil + } +} \ No newline at end of file diff --git a/src/internal/monitor/file_watcher.go b/src/internal/monitor/file_watcher.go index 902171a..9831c70 100644 --- a/src/internal/monitor/file_watcher.go +++ b/src/internal/monitor/file_watcher.go @@ -1,3 +1,4 @@ +// FILE: src/internal/monitor/file_watcher.go package monitor import ( @@ -11,32 +12,38 @@ import ( "regexp" "strings" "sync" + "sync/atomic" "syscall" "time" ) type fileWatcher struct { - path string - callback func(LogEntry) - position int64 - size int64 - inode uint64 - modTime time.Time - mu sync.Mutex - stopped bool - rotationSeq int + path string + callback func(LogEntry) + position int64 + size int64 + inode uint64 + modTime time.Time + mu sync.Mutex + stopped bool + rotationSeq int + entriesRead atomic.Uint64 + lastReadTime atomic.Value // time.Time } func newFileWatcher(path string, callback func(LogEntry)) *fileWatcher { - return &fileWatcher{ + w := &fileWatcher{ path: path, callback: callback, + position: -1, } + w.lastReadTime.Store(time.Time{}) + return w } -func (w *fileWatcher) watch(ctx context.Context) { +func (w *fileWatcher) watch(ctx context.Context) error { if err := w.seekToEnd(); err != nil { - return + return fmt.Errorf("seekToEnd failed: %w", err) } ticker := time.NewTicker(100 * time.Millisecond) @@ -45,12 +52,15 @@ func (w *fileWatcher) watch(ctx context.Context) { for { select { case <-ctx.Done(): - return + return ctx.Err() case <-ticker.C: if w.isStopped() { - return + return fmt.Errorf("watcher stopped") + } + if err := w.checkFile(); err != nil { + // Log error but continue watching + fmt.Printf("[WARN] checkFile error for %s: %v\n", w.path, err) } - w.checkFile() } } } @@ -58,6 +68,17 @@ func (w *fileWatcher) watch(ctx context.Context) { func (w *fileWatcher) seekToEnd() error { file, err := os.Open(w.path) if err != nil { + // For non-existent files, initialize position to 0 + // This allows watching files that don't exist yet + if os.IsNotExist(err) { + w.mu.Lock() + w.position = 0 + w.size = 0 + w.modTime = time.Now() + w.inode = 0 + w.mu.Unlock() + return nil + } return err } defer file.Close() @@ -67,16 +88,21 @@ func (w *fileWatcher) seekToEnd() error { return err } - pos, err := file.Seek(0, io.SeekEnd) - if err != nil { - return err + w.mu.Lock() + // Only seek to end if position was never set (-1) + // This preserves position = 0 for new files while allowing + // directory-discovered files to start reading from current position + if w.position == -1 { + pos, err := file.Seek(0, io.SeekEnd) + if err != nil { + w.mu.Unlock() + return err + } + w.position = pos } - w.mu.Lock() - w.position = pos w.size = info.Size() w.modTime = info.ModTime() - if stat, ok := info.Sys().(*syscall.Stat_t); ok { w.inode = stat.Ino } @@ -88,6 +114,10 @@ func (w *fileWatcher) seekToEnd() error { func (w *fileWatcher) checkFile() error { file, err := os.Open(w.path) if err != nil { + if os.IsNotExist(err) { + // File doesn't exist yet, keep watching + return nil + } return err } defer file.Close() @@ -112,36 +142,49 @@ func (w *fileWatcher) checkFile() error { currentInode = stat.Ino } + // Handle first time seeing a file that didn't exist before + if oldInode == 0 && currentInode != 0 { + // File just appeared, don't treat as rotation + w.mu.Lock() + w.inode = currentInode + w.size = currentSize + w.modTime = currentModTime + // Keep position at 0 to read from beginning if this is a new file + // or seek to end if we want to skip existing content + if oldSize == 0 && w.position == 0 { + // First time seeing this file, seek to end to skip existing content + w.position = currentSize + } + w.mu.Unlock() + return nil + } + + // Check for rotation rotated := false rotationReason := "" if oldInode != 0 && currentInode != 0 && currentInode != oldInode { rotated = true rotationReason = "inode change" - } - - if !rotated && currentSize < oldSize { + } else if currentSize < oldSize { rotated = true rotationReason = "size decrease" - } - - if !rotated && currentModTime.Before(oldModTime) && currentSize <= oldSize { + } else if currentModTime.Before(oldModTime) && currentSize <= oldSize { rotated = true rotationReason = "modification time reset" - } - - if !rotated && oldPos > currentSize+1024 { + } else if oldPos > currentSize+1024 { rotated = true rotationReason = "position beyond file size" } - newPos := oldPos + startPos := oldPos if rotated { - newPos = 0 + startPos = 0 w.mu.Lock() w.rotationSeq++ seq := w.rotationSeq w.inode = currentInode + w.position = 0 // Reset position on rotation w.mu.Unlock() w.callback(LogEntry{ @@ -152,32 +195,52 @@ func (w *fileWatcher) checkFile() error { }) } - if _, err := file.Seek(newPos, io.SeekStart); err != nil { - return err - } - - scanner := bufio.NewScanner(file) - scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) - - for scanner.Scan() { - line := scanner.Text() - if line == "" { - continue + // Only read if there's new content + if currentSize > startPos { + if _, err := file.Seek(startPos, io.SeekStart); err != nil { + return err } - entry := w.parseLine(line) - w.callback(entry) + scanner := bufio.NewScanner(file) + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + + for scanner.Scan() { + line := scanner.Text() + if line == "" { + continue + } + + entry := w.parseLine(line) + w.callback(entry) + w.entriesRead.Add(1) + w.lastReadTime.Store(time.Now()) + } + + // Update position after successful read + if currentPos, err := file.Seek(0, io.SeekCurrent); err == nil { + w.mu.Lock() + w.position = currentPos + w.size = currentSize + w.modTime = currentModTime + if !rotated && currentInode != 0 { + w.inode = currentInode + } + w.mu.Unlock() + } + + return scanner.Err() } - if currentPos, err := file.Seek(0, io.SeekCurrent); err == nil { - w.mu.Lock() - w.position = currentPos - w.size = currentSize - w.modTime = currentModTime - w.mu.Unlock() + // Update metadata even if no new content + w.mu.Lock() + w.size = currentSize + w.modTime = currentModTime + if currentInode != 0 { + w.inode = currentInode } + w.mu.Unlock() - return scanner.Err() + return nil } func (w *fileWatcher) parseLine(line string) LogEntry { @@ -244,6 +307,25 @@ func globToRegex(glob string) string { return "^" + regex + "$" } +func (w *fileWatcher) getInfo() WatcherInfo { + w.mu.Lock() + info := WatcherInfo{ + Path: w.path, + Size: w.size, + Position: w.position, + ModTime: w.modTime, + EntriesRead: w.entriesRead.Load(), + Rotations: w.rotationSeq, + } + w.mu.Unlock() + + if lastRead, ok := w.lastReadTime.Load().(time.Time); ok { + info.LastReadTime = lastRead + } + + return info +} + func (w *fileWatcher) close() { w.stop() } diff --git a/src/internal/monitor/monitor.go b/src/internal/monitor/monitor.go index f67b2e1..6332741 100644 --- a/src/internal/monitor/monitor.go +++ b/src/internal/monitor/monitor.go @@ -9,6 +9,7 @@ import ( "path/filepath" "regexp" "sync" + "sync/atomic" "time" ) @@ -20,15 +21,48 @@ type LogEntry struct { Fields json.RawMessage `json:"fields,omitempty"` } -type Monitor struct { - subscribers []chan LogEntry - targets []target - watchers map[string]*fileWatcher - mu sync.RWMutex - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - checkInterval time.Duration +type Monitor interface { + Start(ctx context.Context) error + Stop() + Subscribe() chan LogEntry + AddTarget(path, pattern string, isFile bool) error + RemoveTarget(path string) error + SetCheckInterval(interval time.Duration) + GetStats() Stats + GetActiveWatchers() []WatcherInfo +} + +type Stats struct { + ActiveWatchers int + TotalEntries uint64 + DroppedEntries uint64 + StartTime time.Time + LastEntryTime time.Time +} + +type WatcherInfo struct { + Path string + Size int64 + Position int64 + ModTime time.Time + EntriesRead uint64 + LastReadTime time.Time + Rotations int +} + +type monitor struct { + subscribers []chan LogEntry + targets []target + watchers map[string]*fileWatcher + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + checkInterval time.Duration + totalEntries atomic.Uint64 + droppedEntries atomic.Uint64 + startTime time.Time + lastEntryTime atomic.Value // time.Time } type target struct { @@ -38,14 +72,17 @@ type target struct { regex *regexp.Regexp } -func New() *Monitor { - return &Monitor{ +func New() Monitor { + m := &monitor{ watchers: make(map[string]*fileWatcher), checkInterval: 100 * time.Millisecond, + startTime: time.Now(), } + m.lastEntryTime.Store(time.Time{}) + return m } -func (m *Monitor) Subscribe() chan LogEntry { +func (m *monitor) Subscribe() chan LogEntry { m.mu.Lock() defer m.mu.Unlock() @@ -54,26 +91,29 @@ func (m *Monitor) Subscribe() chan LogEntry { return ch } -func (m *Monitor) publish(entry LogEntry) { +func (m *monitor) publish(entry LogEntry) { m.mu.RLock() defer m.mu.RUnlock() + m.totalEntries.Add(1) + m.lastEntryTime.Store(entry.Time) + for _, ch := range m.subscribers { select { case ch <- entry: default: - // Drop message if channel full + m.droppedEntries.Add(1) } } } -func (m *Monitor) SetCheckInterval(interval time.Duration) { +func (m *monitor) SetCheckInterval(interval time.Duration) { m.mu.Lock() m.checkInterval = interval m.mu.Unlock() } -func (m *Monitor) AddTarget(path, pattern string, isFile bool) error { +func (m *monitor) AddTarget(path, pattern string, isFile bool) error { absPath, err := filepath.Abs(path) if err != nil { return fmt.Errorf("invalid path %s: %w", path, err) @@ -100,14 +140,41 @@ func (m *Monitor) AddTarget(path, pattern string, isFile bool) error { return nil } -func (m *Monitor) Start(ctx context.Context) error { +func (m *monitor) RemoveTarget(path string) error { + absPath, err := filepath.Abs(path) + if err != nil { + return fmt.Errorf("invalid path %s: %w", path, err) + } + + m.mu.Lock() + defer m.mu.Unlock() + + // Remove from targets + newTargets := make([]target, 0, len(m.targets)) + for _, t := range m.targets { + if t.path != absPath { + newTargets = append(newTargets, t) + } + } + m.targets = newTargets + + // Stop any watchers for this path + if w, exists := m.watchers[absPath]; exists { + w.stop() + delete(m.watchers, absPath) + } + + return nil +} + +func (m *monitor) Start(ctx context.Context) error { m.ctx, m.cancel = context.WithCancel(ctx) m.wg.Add(1) go m.monitorLoop() return nil } -func (m *Monitor) Stop() { +func (m *monitor) Stop() { if m.cancel != nil { m.cancel() } @@ -123,7 +190,34 @@ func (m *Monitor) Stop() { m.mu.Unlock() } -func (m *Monitor) monitorLoop() { +func (m *monitor) GetStats() Stats { + lastEntry, _ := m.lastEntryTime.Load().(time.Time) + + m.mu.RLock() + watcherCount := len(m.watchers) + m.mu.RUnlock() + + return Stats{ + ActiveWatchers: watcherCount, + TotalEntries: m.totalEntries.Load(), + DroppedEntries: m.droppedEntries.Load(), + StartTime: m.startTime, + LastEntryTime: lastEntry, + } +} + +func (m *monitor) GetActiveWatchers() []WatcherInfo { + m.mu.RLock() + defer m.mu.RUnlock() + + info := make([]WatcherInfo, 0, len(m.watchers)) + for _, w := range m.watchers { + info = append(info, w.getInfo()) + } + return info +} + +func (m *monitor) monitorLoop() { defer m.wg.Done() m.checkTargets() @@ -155,7 +249,7 @@ func (m *Monitor) monitorLoop() { } } -func (m *Monitor) checkTargets() { +func (m *monitor) checkTargets() { m.mu.RLock() targets := make([]target, len(m.targets)) copy(targets, m.targets) @@ -165,10 +259,13 @@ func (m *Monitor) checkTargets() { if t.isFile { m.ensureWatcher(t.path) } else { + // Directory scanning for pattern matching files, err := m.scanDirectory(t.path, t.regex) if err != nil { + fmt.Printf("[DEBUG] Error scanning directory %s: %v\n", t.path, err) continue } + for _, file := range files { m.ensureWatcher(file) } @@ -178,7 +275,7 @@ func (m *Monitor) checkTargets() { m.cleanupWatchers() } -func (m *Monitor) scanDirectory(dir string, pattern *regexp.Regexp) ([]string, error) { +func (m *monitor) scanDirectory(dir string, pattern *regexp.Regexp) ([]string, error) { entries, err := os.ReadDir(dir) if err != nil { return nil, err @@ -199,7 +296,7 @@ func (m *Monitor) scanDirectory(dir string, pattern *regexp.Regexp) ([]string, e return files, nil } -func (m *Monitor) ensureWatcher(path string) { +func (m *monitor) ensureWatcher(path string) { m.mu.Lock() defer m.mu.Unlock() @@ -207,17 +304,17 @@ func (m *Monitor) ensureWatcher(path string) { return } - if _, err := os.Stat(path); os.IsNotExist(err) { - return - } - w := newFileWatcher(path, m.publish) m.watchers[path] = w + fmt.Printf("[DEBUG] Created watcher for: %s\n", path) + m.wg.Add(1) go func() { defer m.wg.Done() - w.watch(m.ctx) + if err := w.watch(m.ctx); err != nil { + fmt.Printf("[ERROR] Watcher for %s failed: %v\n", path, err) + } m.mu.Lock() delete(m.watchers, path) @@ -225,7 +322,7 @@ func (m *Monitor) ensureWatcher(path string) { }() } -func (m *Monitor) cleanupWatchers() { +func (m *monitor) cleanupWatchers() { m.mu.Lock() defer m.mu.Unlock() diff --git a/src/internal/stream/http.go b/src/internal/stream/httpstreamer.go similarity index 61% rename from src/internal/stream/http.go rename to src/internal/stream/httpstreamer.go index 24082db..5e15bad 100644 --- a/src/internal/stream/http.go +++ b/src/internal/stream/httpstreamer.go @@ -1,4 +1,4 @@ -// FILE: src/internal/stream/http.go +// FILE: src/internal/stream/httpstreamer.go package stream import ( @@ -25,23 +25,53 @@ type HTTPStreamer struct { startTime time.Time done chan struct{} wg sync.WaitGroup + + // Path configuration + streamPath string + statusPath string + + // For router integration + standalone bool } func NewHTTPStreamer(logChan chan monitor.LogEntry, cfg config.HTTPConfig) *HTTPStreamer { + // Set default paths if not configured + streamPath := cfg.StreamPath + if streamPath == "" { + streamPath = "/stream" + } + statusPath := cfg.StatusPath + if statusPath == "" { + statusPath = "/status" + } + return &HTTPStreamer{ - logChan: logChan, - config: cfg, - startTime: time.Now(), - done: make(chan struct{}), + logChan: logChan, + config: cfg, + startTime: time.Now(), + done: make(chan struct{}), + streamPath: streamPath, + statusPath: statusPath, + standalone: true, // Default to standalone mode } } +// SetRouterMode configures the streamer for use with a router +func (h *HTTPStreamer) SetRouterMode() { + h.standalone = false +} + func (h *HTTPStreamer) Start() error { + if !h.standalone { + // In router mode, don't start our own server + return nil + } + h.server = &fasthttp.Server{ Handler: h.requestHandler, DisableKeepalive: false, StreamRequestBody: true, - Logger: nil, // Suppress fasthttp logs + Logger: nil, } addr := fmt.Sprintf(":%d", h.config.Port) @@ -69,13 +99,10 @@ func (h *HTTPStreamer) Stop() { // Signal all client handlers to stop close(h.done) - // Shutdown HTTP server - if h.server != nil { - // Create context with timeout for server shutdown + // Shutdown HTTP server if in standalone mode + if h.standalone && h.server != nil { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - - // Use ShutdownWithContext for graceful shutdown h.server.ShutdownWithContext(ctx) } @@ -83,16 +110,26 @@ func (h *HTTPStreamer) Stop() { h.wg.Wait() } +func (h *HTTPStreamer) RouteRequest(ctx *fasthttp.RequestCtx) { + h.requestHandler(ctx) +} + func (h *HTTPStreamer) requestHandler(ctx *fasthttp.RequestCtx) { path := string(ctx.Path()) switch path { - case "/stream": + case h.streamPath: h.handleStream(ctx) - case "/status": + case h.statusPath: h.handleStatus(ctx) default: ctx.SetStatusCode(fasthttp.StatusNotFound) + ctx.SetContentType("application/json") + json.NewEncoder(ctx).Encode(map[string]interface{}{ + "error": "Not Found", + "message": fmt.Sprintf("Available endpoints: %s (SSE stream), %s (status)", + h.streamPath, h.statusPath), + }) } } @@ -104,13 +141,6 @@ func (h *HTTPStreamer) handleStream(ctx *fasthttp.RequestCtx) { ctx.Response.Header.Set("Access-Control-Allow-Origin", "*") ctx.Response.Header.Set("X-Accel-Buffering", "no") - h.activeClients.Add(1) - h.wg.Add(1) // Track this client handler - defer func() { - h.activeClients.Add(-1) - h.wg.Done() // Mark handler as done - }() - // Create subscription for this client clientChan := make(chan monitor.LogEntry, h.config.BufferSize) clientDone := make(chan struct{}) @@ -128,14 +158,14 @@ func (h *HTTPStreamer) handleStream(ctx *fasthttp.RequestCtx) { case clientChan <- entry: case <-clientDone: return - case <-h.done: // Check for server shutdown + case <-h.done: return default: // Drop if client buffer full } case <-clientDone: return - case <-h.done: // Check for server shutdown + case <-h.done: return } } @@ -143,11 +173,28 @@ func (h *HTTPStreamer) handleStream(ctx *fasthttp.RequestCtx) { // Define the stream writer function streamFunc := func(w *bufio.Writer) { - defer close(clientDone) + newCount := h.activeClients.Add(1) + fmt.Printf("[HTTP DEBUG] Client connected on port %d. Count now: %d\n", + h.config.Port, newCount) + + h.wg.Add(1) + defer func() { + newCount := h.activeClients.Add(-1) + fmt.Printf("[HTTP DEBUG] Client disconnected on port %d. Count now: %d\n", + h.config.Port, newCount) + h.wg.Done() + }() // Send initial connected event clientID := fmt.Sprintf("%d", time.Now().UnixNano()) - fmt.Fprintf(w, "event: connected\ndata: {\"client_id\":\"%s\"}\n\n", clientID) + connectionInfo := map[string]interface{}{ + "client_id": clientID, + "stream_path": h.streamPath, + "status_path": h.statusPath, + "buffer_size": h.config.BufferSize, + } + data, _ := json.Marshal(connectionInfo) + fmt.Fprintf(w, "event: connected\ndata: %s\n\n", data) w.Flush() var ticker *time.Ticker @@ -184,7 +231,7 @@ func (h *HTTPStreamer) handleStream(ctx *fasthttp.RequestCtx) { } } - case <-h.done: // ADDED: Check for server shutdown + case <-h.done: // Send final disconnect event fmt.Fprintf(w, "event: disconnect\ndata: {\"reason\":\"server_shutdown\"}\n\n") w.Flush() @@ -240,13 +287,48 @@ func (h *HTTPStreamer) handleStatus(ctx *fasthttp.RequestCtx) { status := map[string]interface{}{ "service": "LogWisp", "version": "3.0.0", - "http_server": map[string]interface{}{ + "server": map[string]interface{}{ + "type": "http", "port": h.config.Port, "active_clients": h.activeClients.Load(), "buffer_size": h.config.BufferSize, + "uptime_seconds": int(time.Since(h.startTime).Seconds()), + "mode": map[string]bool{"standalone": h.standalone, "router": !h.standalone}, + }, + "endpoints": map[string]string{ + "stream": h.streamPath, + "status": h.statusPath, + }, + "features": map[string]interface{}{ + "heartbeat": map[string]interface{}{ + "enabled": h.config.Heartbeat.Enabled, + "interval": h.config.Heartbeat.IntervalSeconds, + "format": h.config.Heartbeat.Format, + }, + "ssl": map[string]bool{ + "enabled": h.config.SSL != nil && h.config.SSL.Enabled, + }, + "rate_limit": map[string]bool{ + "enabled": h.config.RateLimit != nil && h.config.RateLimit.Enabled, + }, }, } data, _ := json.Marshal(status) ctx.SetBody(data) +} + +// GetActiveConnections returns the current number of active clients +func (h *HTTPStreamer) GetActiveConnections() int32 { + return h.activeClients.Load() +} + +// GetStreamPath returns the configured stream endpoint path +func (h *HTTPStreamer) GetStreamPath() string { + return h.streamPath +} + +// GetStatusPath returns the configured status endpoint path +func (h *HTTPStreamer) GetStatusPath() string { + return h.statusPath } \ No newline at end of file diff --git a/src/internal/stream/tcpserver.go b/src/internal/stream/tcpserver.go new file mode 100644 index 0000000..aa2b6e9 --- /dev/null +++ b/src/internal/stream/tcpserver.go @@ -0,0 +1,52 @@ +// FILE: src/internal/monitor/tcpserver.go +package stream + +import ( + "fmt" + "github.com/panjf2000/gnet/v2" + "sync" +) + +type tcpServer struct { + gnet.BuiltinEventEngine + streamer *TCPStreamer + connections sync.Map +} + +func (s *tcpServer) OnBoot(eng gnet.Engine) gnet.Action { + // Store engine reference for shutdown + s.streamer.engine = &eng + return gnet.None +} + +func (s *tcpServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { + s.connections.Store(c, struct{}{}) + + oldCount := s.streamer.activeConns.Load() + newCount := s.streamer.activeConns.Add(1) + fmt.Printf("[TCP ATOMIC] OnOpen: %d -> %d (expected: %d)\n", oldCount, newCount, oldCount+1) + + fmt.Printf("[TCP DEBUG] Connection opened. Count now: %d\n", newCount) + return nil, gnet.None +} + +func (s *tcpServer) OnClose(c gnet.Conn, err error) gnet.Action { + s.connections.Delete(c) + + oldCount := s.streamer.activeConns.Load() + newCount := s.streamer.activeConns.Add(-1) + fmt.Printf("[TCP ATOMIC] OnClose: %d -> %d (expected: %d)\n", oldCount, newCount, oldCount-1) + + fmt.Printf("[TCP DEBUG] Connection closed. Count now: %d (err: %v)\n", newCount, err) + return gnet.None +} + +func (s *tcpServer) OnTraffic(c gnet.Conn) gnet.Action { + // We don't expect input from clients, just discard + c.Discard(-1) + return gnet.None +} + +func (t *TCPStreamer) GetActiveConnections() int32 { + return t.activeConns.Load() +} \ No newline at end of file diff --git a/src/internal/stream/tcp.go b/src/internal/stream/tcpstreamer.go similarity index 77% rename from src/internal/stream/tcp.go rename to src/internal/stream/tcpstreamer.go index 71251f5..0683f58 100644 --- a/src/internal/stream/tcp.go +++ b/src/internal/stream/tcpstreamer.go @@ -1,4 +1,4 @@ -// FILE: src/internal/stream/tcp.go +// FILE: src/internal/stream/tcpstreamer.go package stream import ( @@ -18,25 +18,19 @@ type TCPStreamer struct { logChan chan monitor.LogEntry config config.TCPConfig server *tcpServer + done chan struct{} activeConns atomic.Int32 startTime time.Time - done chan struct{} engine *gnet.Engine wg sync.WaitGroup } -type tcpServer struct { - gnet.BuiltinEventEngine - streamer *TCPStreamer - connections sync.Map -} - func NewTCPStreamer(logChan chan monitor.LogEntry, cfg config.TCPConfig) *TCPStreamer { return &TCPStreamer{ logChan: logChan, config: cfg, - startTime: time.Now(), done: make(chan struct{}), + startTime: time.Now(), } } @@ -50,14 +44,14 @@ func (t *TCPStreamer) Start() error { t.broadcastLoop() }() - // Configure gnet with no-op logger + // Configure gnet addr := fmt.Sprintf("tcp://:%d", t.config.Port) // Run gnet in separate goroutine to avoid blocking errChan := make(chan error, 1) go func() { err := gnet.Run(t.server, addr, - gnet.WithLogger(noopLogger{}), // No-op logger: discard everything + gnet.WithLogger(noopLogger{}), gnet.WithMulticore(true), gnet.WithReusePort(true), ) @@ -83,7 +77,6 @@ func (t *TCPStreamer) Stop() { // Stop gnet engine if running if t.engine != nil { - // Use Stop() method to gracefully shutdown gnet ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() t.engine.Stop(ctx) @@ -107,7 +100,7 @@ func (t *TCPStreamer) broadcastLoop() { select { case entry, ok := <-t.logChan: if !ok { - return // Channel closed + return } data, err := json.Marshal(entry) if err != nil { @@ -155,27 +148,4 @@ func (t *TCPStreamer) formatHeartbeat() []byte { jsonData, _ := json.Marshal(data) return append(jsonData, '\n') -} - -func (s *tcpServer) OnBoot(eng gnet.Engine) gnet.Action { - s.streamer.engine = &eng - return gnet.None -} - -func (s *tcpServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { - s.connections.Store(c, struct{}{}) - s.streamer.activeConns.Add(1) - return nil, gnet.None -} - -func (s *tcpServer) OnClose(c gnet.Conn, err error) gnet.Action { - s.connections.Delete(c) - s.streamer.activeConns.Add(-1) - return gnet.None -} - -func (s *tcpServer) OnTraffic(c gnet.Conn) gnet.Action { - // We don't expect input from clients, just discard - c.Discard(-1) - return gnet.None } \ No newline at end of file