Apache Druid: Real-Time Analytics Database
Apache Druid is a high-performance, distributed analytics database designed for real-time ingestion and fast querying of large-scale data. It's optimized for time-series data and provides sub-second query performance for analytical workloads.
Overview
Apache Druid is an open-source, distributed analytics database that excels at:
- Real-time Ingestion: Stream data processing with low latency
- Time-series Analytics: Optimized for time-based data analysis
- High Scalability: Distributed architecture for massive data volumes
- Fast Queries: Sub-second response times for analytical queries
- Column-oriented Storage: Efficient storage for analytical workloads
Key Features
🚀 Performance Features
- Real-time Ingestion: Stream data processing capabilities
- Column-oriented Storage: Optimized for analytical queries
- Distributed Architecture: Horizontal scaling across multiple nodes
- Sub-second Queries: Fast response times for complex analytics
- Time-based Partitioning: Efficient time-series data handling
🔧 Operational Features
- Fault Tolerance: Built-in replication and failover
- High Availability: No single point of failure
- Self-healing: Automatic recovery from node failures
- Multi-tenancy: Support for multiple data sources
- REST API: Comprehensive API for operations
📊 Data Model Features
- Time-series Optimization: Native time-based data handling
- Flexible Schema: Support for nested and complex data types
- Rollup Aggregations: Pre-computed aggregations for performance
- Data Retention: Configurable data lifecycle management
- Compression: Advanced data compression algorithms
Architecture
Core Components
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Coordinator │ │ Coordinator │ │ Coordinator │
│ Service │ │ Service │ │ Service │
└─────────┬───────┘ └─────────┬───────┘ └─────────┬───────┘
│ │ │
└──────────────────────┼──────────────────────┘
│
┌─────────────┴─────────────┐
│ Load Balancer │
└─────────────┬─────────────┘
│
┌─────────────────────────┼─────────────────────────┐
│ │ │
┌───────▼────────┐ ┌───────────▼──────────┐ ┌───────▼────────┐
│ Historical │ │ Historical │ │ Historical │
│ Service │ │ Service │ │ Service │
└────────────────┘ └──────────────────────┘ └────────────────┘
│ │ │
└──────────────────────┼────── ────────────────┘
│
┌───────────▼──────────┐
│ Middle Manager │
│ (Ingestion) │
└──────────────────────┘
Component Roles
- Coordinator Service: Metadata management and cluster coordination
- Historical Service: Long-term data storage and query processing
- Middle Manager: Data ingestion and real-time processing
- Broker Service: Query routing and result aggregation
- Router Service: Request routing and load balancing
Installation
Docker Installation
# Pull Druid image
docker pull apache/druid:latest
# Run Druid with docker-compose
wget https://raw.githubusercontent.com/apache/druid/master/distribution/docker/docker-compose.yml
# Start Druid services
docker-compose up -d
Native Installation
# Download Druid
wget https://downloads.apache.org/druid/0.23.0/apache-druid-0.23.0-bin.tar.gz
tar -xzf apache-druid-0.23.0-bin.tar.gz
cd apache-druid-0.23.0
# Start Druid services
bin/start-nano-quickstart
Cluster Setup
# Start Coordinator
bin/start-coordinator.sh
# Start Historical
bin/start-historical.sh
# Start Middle Manager
bin/start-middleManager.sh
# Start Broker
bin/start-broker.sh
# Start Router
bin/start-router.sh
Configuration
Coordinator Configuration (conf/druid/cluster/master/coordinator/runtime.properties
)
# Coordinator settings
druid.port=8081
druid.service=coordinator
# Metadata storage
druid.metadata.storage.type=derby
druid.metadata.storage.connector.connectURI=jdbc:derby://localhost:1527/druid;create=true
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=druid
# Zookeeper configuration
druid.zk.service.host=localhost
druid.zk.paths.base=/druid
# Memory configuration
druid.processing.buffer.sizeBytes=256000000
druid.processing.numMergeBuffers=2
Historical Configuration (conf/druid/cluster/data/historical/runtime.properties
)
# Historical settings
druid.port=8083
druid.service=historical
# Storage configuration
druid.processing.buffer.sizeBytes=256000000
druid.processing.numMergeBuffers=2
druid.processing.numThreads=1
# Cache configuration
druid.cache.type=local
druid.cache.sizeInBytes=1000000000
# Segment storage
druid.segmentCache.locations=[{"path":"/tmp/druid/indexCache","maxSize":1000000000}]
Middle Manager Configuration (conf/druid/cluster/data/middleManager/runtime.properties
)
# Middle Manager settings
druid.port=8091
druid.service=middleManager
# Worker configuration
druid.worker.capacity=3
druid.processing.buffer.sizeBytes=256000000
druid.processing.numMergeBuffers=2
druid.processing.numThreads=1
# Task configuration
druid.indexer.runner.javaOpts=-server -Xmx2g -Xms2g
druid.indexer.fork.property.druid.processing.buffer.sizeBytes=256000000
Data Ingestion
Batch Ingestion
{
"type": "index_parallel",
"spec": {
"dataSchema": {
"dataSource": "sales_events",
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": ["customer_id", "product_id", "region", "category"]
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "revenue",
"fieldName": "amount"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "HOUR",
"rollup": true
}
},
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "local",
"baseDir": "/path/to/data",
"filter": "*.json"
},
"inputFormat": {
"type": "json"
}
},
"tuningConfig": {
"type": "index_parallel",
"partitionsSpec": {
"type": "dynamic"
}
}
}
}
Real-time Ingestion
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "real_time_events",
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": ["user_id", "event_type", "page_url"]
},
"metricsSpec": [
{
"type": "count",
"name": "count"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "MINUTE",
"rollup": true
}
},
"ioConfig": {
"type": "kafka",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"topic": "events",
"inputFormat": {
"type": "json"
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
}
}
}
Query Language (Druid SQL)
Basic Queries
-- Simple SELECT
SELECT * FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
LIMIT 100;
-- Aggregation queries
SELECT
region,
COUNT(*) as total_sales,
SUM(revenue) as total_revenue,
AVG(revenue) as avg_revenue
FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
GROUP BY region
ORDER BY total_revenue DESC;
-- Time-based aggregations
SELECT
TIME_FLOOR(__time, 'PT1H') as hour,
COUNT(*) as events_per_hour,
SUM(revenue) as hourly_revenue
FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
GROUP BY TIME_FLOOR(__time, 'PT1H')
ORDER BY hour;
Advanced Analytics
-- Window functions
SELECT
customer_id,
__time,
revenue,
SUM(revenue) OVER (
PARTITION BY customer_id
ORDER BY __time
ROWS UNBOUNDED PRECEDING
) as running_total
FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
ORDER BY customer_id, __time;
-- Top N analysis
SELECT
region,
product_id,
SUM(revenue) as total_revenue
FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
GROUP BY region, product_id
QUALIFY ROW_NUMBER() OVER (
PARTITION BY region
ORDER BY SUM(revenue) DESC
) <= 5
ORDER BY region, total_revenue DESC;
Native Druid Queries
{
"queryType": "groupBy",
"dataSource": "sales_events",
"granularity": "day",
"dimensions": ["region", "category"],
"aggregations": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "revenue",
"fieldName": "revenue"
}
],
"intervals": ["2024-01-01/2024-02-01"],
"filter": {
"type": "selector",
"dimension": "region",
"value": "North America"
}
}
Performance Optimization
Query Optimization
-- Use time-based filters
SELECT * FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
AND __time < TIMESTAMP '2024-02-01 00:00:00';
-- Use appropriate granularity
SELECT
TIME_FLOOR(__time, 'PT1H') as hour,
COUNT(*) as events
FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
GROUP BY TIME_FLOOR(__time, 'PT1H');
-- Use LIMIT for large result sets
SELECT * FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
LIMIT 1000;
Data Modeling
{
"dimensionsSpec": {
"dimensions": [
{
"type": "string",
"name": "customer_id"
},
{
"type": "string",
"name": "product_id"
},
{
"type": "long",
"name": "timestamp"
}
]
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "revenue",
"fieldName": "amount"
},
{
"type": "doubleMin",
"name": "min_amount",
"fieldName": "amount"
},
{
"type": "doubleMax",
"name": "max_amount",
"fieldName": "amount"
}
]
}
Monitoring and Maintenance
Health Checks
# Check coordinator status
curl -X GET http://localhost:8081/status
# Check historical status
curl -X GET http://localhost:8083/status
# Check broker status
curl -X GET http://localhost:8082/status
# Check middle manager status
curl -X GET http://localhost:8091/status
System Queries
-- Check data sources
SELECT datasource, count as segments, size as size_bytes
FROM sys.segments
ORDER BY size_bytes DESC;
-- Check query history
SELECT
query_id,
datasource,
query,
duration,
__time
FROM sys.query
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
ORDER BY duration DESC;
Maintenance Tasks
# Kill long-running queries
curl -X POST http://localhost:8082/druid/v2/sql/statements/query-id/cancel
# Compact segments
curl -X POST http://localhost:8081/druid/coordinator/v1/datasources/datasource-name/compaction \
-H "Content-Type: application/json" \
-d '{"dataSource": "datasource-name"}'
# Kill tasks
curl -X POST http://localhost:8091/druid/indexer/v1/task/task-id/shutdown
Integration Examples
Python Integration
import requests
import json
import pandas as pd
# Druid query endpoint
druid_url = "http://localhost:8082/druid/v2/sql"
# SQL query
query = {
"query": """
SELECT
region,
COUNT(*) as total_sales,
SUM(revenue) as total_revenue
FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
GROUP BY region
ORDER BY total_revenue DESC
"""
}
# Execute query
response = requests.post(druid_url, json=query)
results = response.json()
# Convert to DataFrame
df = pd.DataFrame(results)
print(df)
Java Integration
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.*;
import java.util.Map;
import java.util.List;
public class DruidClient {
private final OkHttpClient client = new OkHttpClient();
private final ObjectMapper mapper = new ObjectMapper();
private final String druidUrl = "http://localhost:8082/druid/v2/sql";
public List<Map<String, Object>> executeQuery(String sql) throws Exception {
String json = mapper.writeValueAsString(Map.of("query", sql));
RequestBody body = RequestBody.create(
MediaType.parse("application/json"), json
);
Request request = new Request.Builder()
.url(druidUrl)
.post(body)
.build();
try (Response response = client.newCall(request).execute()) {
String responseBody = response.body().string();
return mapper.readValue(responseBody, List.class);
}
}
public static void main(String[] args) throws Exception {
DruidClient client = new DruidClient();
String sql = """
SELECT region, COUNT(*) as total_sales
FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
GROUP BY region
""";
List<Map<String, Object>> results = client.executeQuery(sql);
results.forEach(System.out::println);
}
}
Best Practices
Data Modeling
- Choose Appropriate Granularity: Balance query performance vs storage
- Use Rollup Aggregations: Pre-compute common aggregations
- Optimize Dimensions: Keep dimension cardinality manageable
- Use Time-based Partitioning: Leverage Druid's time-series optimization
Performance
- Use Time Filters: Always include time-based filters in queries
- Limit Result Sets: Use LIMIT for large result sets
- Optimize Granularity: Choose appropriate query granularity
- Use Approximate Aggregations: Use hyperUnique for large cardinality
Operations
- Monitor Segments: Regularly check segment health and size
- Compact Segments: Periodically compact small segments
- Tune Memory: Configure appropriate memory settings
- Backup Metadata: Regular backup of metadata storage
Troubleshooting
Common Issues
-
Ingestion Failures
# Check task logs
curl -X GET http://localhost:8091/druid/indexer/v1/task/task-id/log
# Check task status
curl -X GET http://localhost:8091/druid/indexer/v1/task/task-id/status -
Query Performance Issues
-- Check slow queries
SELECT query_id, duration, query
FROM sys.query
WHERE duration > 10000
ORDER BY duration DESC; -
Memory Issues
# Check JVM memory usage
jstat -gc <druid_pid>
# Check heap dump
jmap -dump:format=b,file=heap.hprof <druid_pid>
Debug Mode
# Enable debug logging
export DRUID_LOG_LEVEL=DEBUG
# Check detailed logs
tail -f /var/log/druid/coordinator.log
tail -f /var/log/druid/historical.log
tail -f /var/log/druid/broker.log
Resources and References
Official Resources
- Documentation: druid.apache.org/docs
- GitHub: github.com/apache/druid
- Community: druid.apache.org/community
Related Tools
- Druid Console: Web-based management interface
- Druid Imply: Commercial distribution with additional tools
- Druid Connectors: Various data source connectors
Learning Resources
- SQL Reference: Complete Druid SQL documentation
- Performance Tuning Guide: Optimization best practices
- Architecture Guide: Deep dive into Druid architecture