Logs are essential for debugging, auditing, and understanding system behavior. At scale, you need centralized logging that's searchable, analyzable, and cost-effective.
Log Aggregation Architecture#
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Service A │ │ Service B │ │ Service C │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
└────────────────┼────────────────┘
│
▼
┌─────────────────┐
│ Log Shipper │
│ (Fluentd/ │
│ Filebeat) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Message Queue │
│ (Kafka) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Log Storage │
│ (Elasticsearch/ │
│ Loki/Cloud) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Visualization │
│(Kibana/Grafana) │
└─────────────────┘
Structured Logging#
Log Format#
1// Structured JSON logs
2interface LogEntry {
3 timestamp: string; // ISO 8601
4 level: string; // info, warn, error
5 message: string; // Human-readable message
6 service: string; // Service name
7 environment: string; // prod, staging, dev
8 traceId?: string; // Distributed tracing
9 spanId?: string;
10 userId?: string; // Context
11 requestId?: string;
12 error?: {
13 name: string;
14 message: string;
15 stack: string;
16 };
17 metadata?: Record<string, unknown>;
18}
19
20// Example output
21{
22 "timestamp": "2024-09-20T10:30:00.000Z",
23 "level": "error",
24 "message": "Failed to process payment",
25 "service": "payment-service",
26 "environment": "production",
27 "traceId": "abc123",
28 "requestId": "req-456",
29 "userId": "user-789",
30 "error": {
31 "name": "PaymentError",
32 "message": "Card declined",
33 "stack": "Error: Card declined\n at processPayment..."
34 },
35 "metadata": {
36 "orderId": "order-111",
37 "amount": 99.99
38 }
39}Implementation#
1import pino from 'pino';
2
3const logger = pino({
4 level: process.env.LOG_LEVEL || 'info',
5 base: {
6 service: process.env.SERVICE_NAME,
7 environment: process.env.NODE_ENV,
8 version: process.env.APP_VERSION,
9 },
10 timestamp: () => `,"timestamp":"${new Date().toISOString()}"`,
11});
12
13// Create child logger with request context
14function createRequestLogger(req: Request) {
15 return logger.child({
16 requestId: req.headers['x-request-id'],
17 traceId: req.headers['x-trace-id'],
18 userId: req.user?.id,
19 });
20}
21
22// Usage
23app.use((req, res, next) => {
24 req.log = createRequestLogger(req);
25 req.log.info({ path: req.path, method: req.method }, 'Request started');
26 next();
27});Log Collection#
Fluentd Configuration#
1# fluent.conf
2<source>
3 @type tail
4 path /var/log/app/*.log
5 pos_file /var/log/fluentd/app.log.pos
6 tag app.logs
7 <parse>
8 @type json
9 time_key timestamp
10 time_format %Y-%m-%dT%H:%M:%S.%L%z
11 </parse>
12</source>
13
14<filter app.logs>
15 @type record_transformer
16 <record>
17 hostname "#{Socket.gethostname}"
18 environment "#{ENV['ENVIRONMENT']}"
19 </record>
20</filter>
21
22<match app.logs>
23 @type elasticsearch
24 host elasticsearch
25 port 9200
26 index_name logs-${tag}-%Y.%m.%d
27 <buffer>
28 @type memory
29 flush_interval 5s
30 chunk_limit_size 5MB
31 retry_max_interval 30
32 </buffer>
33</match>Kubernetes Logging#
1# DaemonSet for log collection
2apiVersion: apps/v1
3kind: DaemonSet
4metadata:
5 name: fluentd
6spec:
7 selector:
8 matchLabels:
9 app: fluentd
10 template:
11 spec:
12 containers:
13 - name: fluentd
14 image: fluent/fluentd-kubernetes-daemonset:v1.16-debian-elasticsearch8
15 env:
16 - name: FLUENT_ELASTICSEARCH_HOST
17 value: "elasticsearch"
18 volumeMounts:
19 - name: varlog
20 mountPath: /var/log
21 - name: containers
22 mountPath: /var/lib/docker/containers
23 readOnly: true
24 volumes:
25 - name: varlog
26 hostPath:
27 path: /var/log
28 - name: containers
29 hostPath:
30 path: /var/lib/docker/containersStorage Solutions#
Elasticsearch#
1# Common queries
2# Find errors in last hour
3GET logs-*/_search
4{
5 "query": {
6 "bool": {
7 "filter": [
8 { "term": { "level": "error" } },
9 { "range": { "timestamp": { "gte": "now-1h" } } }
10 ]
11 }
12 },
13 "sort": [{ "timestamp": "desc" }],
14 "size": 100
15}
16
17# Aggregate errors by service
18GET logs-*/_search
19{
20 "size": 0,
21 "query": {
22 "range": { "timestamp": { "gte": "now-24h" } }
23 },
24 "aggs": {
25 "by_service": {
26 "terms": { "field": "service" },
27 "aggs": {
28 "error_count": {
29 "filter": { "term": { "level": "error" } }
30 }
31 }
32 }
33 }
34}Grafana Loki#
1# Loki config
2auth_enabled: false
3
4server:
5 http_listen_port: 3100
6
7ingester:
8 lifecycler:
9 ring:
10 kvstore:
11 store: inmemory
12 replication_factor: 1
13
14schema_config:
15 configs:
16 - from: 2024-01-01
17 store: boltdb-shipper
18 object_store: s3
19 schema: v12
20 index:
21 prefix: loki_index_
22 period: 24h
23
24storage_config:
25 boltdb_shipper:
26 active_index_directory: /loki/index
27 cache_location: /loki/cache
28 aws:
29 s3: s3://region/bucket-name1# LogQL queries
2# Find errors
3{service="api"} |= "error"
4
5# JSON parsing
6{service="api"} | json | level="error"
7
8# With metrics
9sum(rate({service="api"} | json | level="error" [5m]))Log Retention#
1# Elasticsearch Index Lifecycle Management
2PUT _ilm/policy/logs-policy
3{
4 "policy": {
5 "phases": {
6 "hot": {
7 "min_age": "0ms",
8 "actions": {
9 "rollover": {
10 "max_size": "50GB",
11 "max_age": "1d"
12 }
13 }
14 },
15 "warm": {
16 "min_age": "7d",
17 "actions": {
18 "shrink": { "number_of_shards": 1 },
19 "forcemerge": { "max_num_segments": 1 }
20 }
21 },
22 "cold": {
23 "min_age": "30d",
24 "actions": {
25 "freeze": {}
26 }
27 },
28 "delete": {
29 "min_age": "90d",
30 "actions": {
31 "delete": {}
32 }
33 }
34 }
35 }
36}Best Practices#
What to Log#
1// ✅ Log these
2logger.info('Request received', { path, method, userId });
3logger.info('Business event', { event: 'order_placed', orderId, total });
4logger.error('Operation failed', { operation, error, context });
5logger.warn('Degraded performance', { latency, threshold });
6
7// ❌ Don't log these
8logger.info('Password: ' + password); // Sensitive data
9logger.info('Request body', { body }); // May contain PII
10logger.debug('Loop iteration'); // Too verbose for productionLog Sampling#
1// Sample verbose logs
2function shouldLog(sampleRate: number): boolean {
3 return Math.random() < sampleRate;
4}
5
6// Log 10% of successful requests
7if (response.ok && shouldLog(0.1)) {
8 logger.debug('Request succeeded', { details });
9}
10
11// Always log errors
12if (!response.ok) {
13 logger.error('Request failed', { details });
14}Conclusion#
Effective log aggregation requires structured logging, efficient collection, and smart retention. Use structured JSON logs for parseability, centralize logs for searchability, and implement retention policies for cost control.
Logs complement metrics and traces—together they provide complete observability into your systems.