Back to Blog
LoggingObservabilityELKDevOps

Log Aggregation and Analysis at Scale

Collect, store, and analyze logs from distributed systems. From ELK stack to cloud solutions to effective log queries.

B
Bootspring Team
Engineering
September 20, 2024
5 min read

Logs are essential for debugging, auditing, and understanding system behavior. At scale, you need centralized logging that's searchable, analyzable, and cost-effective.

Log Aggregation Architecture#

┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Service A │ │ Service B │ │ Service C │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ └────────────────┼────────────────┘ │ ▼ ┌─────────────────┐ │ Log Shipper │ │ (Fluentd/ │ │ Filebeat) │ └────────┬────────┘ │ ▼ ┌─────────────────┐ │ Message Queue │ │ (Kafka) │ └────────┬────────┘ │ ▼ ┌─────────────────┐ │ Log Storage │ │ (Elasticsearch/ │ │ Loki/Cloud) │ └────────┬────────┘ │ ▼ ┌─────────────────┐ │ Visualization │ │(Kibana/Grafana) │ └─────────────────┘

Structured Logging#

Log Format#

1// Structured JSON logs 2interface LogEntry { 3 timestamp: string; // ISO 8601 4 level: string; // info, warn, error 5 message: string; // Human-readable message 6 service: string; // Service name 7 environment: string; // prod, staging, dev 8 traceId?: string; // Distributed tracing 9 spanId?: string; 10 userId?: string; // Context 11 requestId?: string; 12 error?: { 13 name: string; 14 message: string; 15 stack: string; 16 }; 17 metadata?: Record<string, unknown>; 18} 19 20// Example output 21{ 22 "timestamp": "2024-09-20T10:30:00.000Z", 23 "level": "error", 24 "message": "Failed to process payment", 25 "service": "payment-service", 26 "environment": "production", 27 "traceId": "abc123", 28 "requestId": "req-456", 29 "userId": "user-789", 30 "error": { 31 "name": "PaymentError", 32 "message": "Card declined", 33 "stack": "Error: Card declined\n at processPayment..." 34 }, 35 "metadata": { 36 "orderId": "order-111", 37 "amount": 99.99 38 } 39}

Implementation#

1import pino from 'pino'; 2 3const logger = pino({ 4 level: process.env.LOG_LEVEL || 'info', 5 base: { 6 service: process.env.SERVICE_NAME, 7 environment: process.env.NODE_ENV, 8 version: process.env.APP_VERSION, 9 }, 10 timestamp: () => `,"timestamp":"${new Date().toISOString()}"`, 11}); 12 13// Create child logger with request context 14function createRequestLogger(req: Request) { 15 return logger.child({ 16 requestId: req.headers['x-request-id'], 17 traceId: req.headers['x-trace-id'], 18 userId: req.user?.id, 19 }); 20} 21 22// Usage 23app.use((req, res, next) => { 24 req.log = createRequestLogger(req); 25 req.log.info({ path: req.path, method: req.method }, 'Request started'); 26 next(); 27});

Log Collection#

Fluentd Configuration#

1# fluent.conf 2<source> 3 @type tail 4 path /var/log/app/*.log 5 pos_file /var/log/fluentd/app.log.pos 6 tag app.logs 7 <parse> 8 @type json 9 time_key timestamp 10 time_format %Y-%m-%dT%H:%M:%S.%L%z 11 </parse> 12</source> 13 14<filter app.logs> 15 @type record_transformer 16 <record> 17 hostname "#{Socket.gethostname}" 18 environment "#{ENV['ENVIRONMENT']}" 19 </record> 20</filter> 21 22<match app.logs> 23 @type elasticsearch 24 host elasticsearch 25 port 9200 26 index_name logs-${tag}-%Y.%m.%d 27 <buffer> 28 @type memory 29 flush_interval 5s 30 chunk_limit_size 5MB 31 retry_max_interval 30 32 </buffer> 33</match>

Kubernetes Logging#

1# DaemonSet for log collection 2apiVersion: apps/v1 3kind: DaemonSet 4metadata: 5 name: fluentd 6spec: 7 selector: 8 matchLabels: 9 app: fluentd 10 template: 11 spec: 12 containers: 13 - name: fluentd 14 image: fluent/fluentd-kubernetes-daemonset:v1.16-debian-elasticsearch8 15 env: 16 - name: FLUENT_ELASTICSEARCH_HOST 17 value: "elasticsearch" 18 volumeMounts: 19 - name: varlog 20 mountPath: /var/log 21 - name: containers 22 mountPath: /var/lib/docker/containers 23 readOnly: true 24 volumes: 25 - name: varlog 26 hostPath: 27 path: /var/log 28 - name: containers 29 hostPath: 30 path: /var/lib/docker/containers

Storage Solutions#

Elasticsearch#

1# Common queries 2# Find errors in last hour 3GET logs-*/_search 4{ 5 "query": { 6 "bool": { 7 "filter": [ 8 { "term": { "level": "error" } }, 9 { "range": { "timestamp": { "gte": "now-1h" } } } 10 ] 11 } 12 }, 13 "sort": [{ "timestamp": "desc" }], 14 "size": 100 15} 16 17# Aggregate errors by service 18GET logs-*/_search 19{ 20 "size": 0, 21 "query": { 22 "range": { "timestamp": { "gte": "now-24h" } } 23 }, 24 "aggs": { 25 "by_service": { 26 "terms": { "field": "service" }, 27 "aggs": { 28 "error_count": { 29 "filter": { "term": { "level": "error" } } 30 } 31 } 32 } 33 } 34}

Grafana Loki#

1# Loki config 2auth_enabled: false 3 4server: 5 http_listen_port: 3100 6 7ingester: 8 lifecycler: 9 ring: 10 kvstore: 11 store: inmemory 12 replication_factor: 1 13 14schema_config: 15 configs: 16 - from: 2024-01-01 17 store: boltdb-shipper 18 object_store: s3 19 schema: v12 20 index: 21 prefix: loki_index_ 22 period: 24h 23 24storage_config: 25 boltdb_shipper: 26 active_index_directory: /loki/index 27 cache_location: /loki/cache 28 aws: 29 s3: s3://region/bucket-name
1# LogQL queries 2# Find errors 3{service="api"} |= "error" 4 5# JSON parsing 6{service="api"} | json | level="error" 7 8# With metrics 9sum(rate({service="api"} | json | level="error" [5m]))

Log Retention#

1# Elasticsearch Index Lifecycle Management 2PUT _ilm/policy/logs-policy 3{ 4 "policy": { 5 "phases": { 6 "hot": { 7 "min_age": "0ms", 8 "actions": { 9 "rollover": { 10 "max_size": "50GB", 11 "max_age": "1d" 12 } 13 } 14 }, 15 "warm": { 16 "min_age": "7d", 17 "actions": { 18 "shrink": { "number_of_shards": 1 }, 19 "forcemerge": { "max_num_segments": 1 } 20 } 21 }, 22 "cold": { 23 "min_age": "30d", 24 "actions": { 25 "freeze": {} 26 } 27 }, 28 "delete": { 29 "min_age": "90d", 30 "actions": { 31 "delete": {} 32 } 33 } 34 } 35 } 36}

Best Practices#

What to Log#

1// ✅ Log these 2logger.info('Request received', { path, method, userId }); 3logger.info('Business event', { event: 'order_placed', orderId, total }); 4logger.error('Operation failed', { operation, error, context }); 5logger.warn('Degraded performance', { latency, threshold }); 6 7// ❌ Don't log these 8logger.info('Password: ' + password); // Sensitive data 9logger.info('Request body', { body }); // May contain PII 10logger.debug('Loop iteration'); // Too verbose for production

Log Sampling#

1// Sample verbose logs 2function shouldLog(sampleRate: number): boolean { 3 return Math.random() < sampleRate; 4} 5 6// Log 10% of successful requests 7if (response.ok && shouldLog(0.1)) { 8 logger.debug('Request succeeded', { details }); 9} 10 11// Always log errors 12if (!response.ok) { 13 logger.error('Request failed', { details }); 14}

Conclusion#

Effective log aggregation requires structured logging, efficient collection, and smart retention. Use structured JSON logs for parseability, centralize logs for searchability, and implement retention policies for cost control.

Logs complement metrics and traces—together they provide complete observability into your systems.

Share this article

Help spread the word about Bootspring