Skip to main content

Apache Druid: Real-Time Analytics Database

Apache Druid is a high-performance, distributed analytics database designed for real-time ingestion and fast querying of large-scale data. It's optimized for time-series data and provides sub-second query performance for analytical workloads.

Overview

Apache Druid is an open-source, distributed analytics database that excels at:

  • Real-time Ingestion: Stream data processing with low latency
  • Time-series Analytics: Optimized for time-based data analysis
  • High Scalability: Distributed architecture for massive data volumes
  • Fast Queries: Sub-second response times for analytical queries
  • Column-oriented Storage: Efficient storage for analytical workloads

Key Features

🚀 Performance Features

  • Real-time Ingestion: Stream data processing capabilities
  • Column-oriented Storage: Optimized for analytical queries
  • Distributed Architecture: Horizontal scaling across multiple nodes
  • Sub-second Queries: Fast response times for complex analytics
  • Time-based Partitioning: Efficient time-series data handling

🔧 Operational Features

  • Fault Tolerance: Built-in replication and failover
  • High Availability: No single point of failure
  • Self-healing: Automatic recovery from node failures
  • Multi-tenancy: Support for multiple data sources
  • REST API: Comprehensive API for operations

📊 Data Model Features

  • Time-series Optimization: Native time-based data handling
  • Flexible Schema: Support for nested and complex data types
  • Rollup Aggregations: Pre-computed aggregations for performance
  • Data Retention: Configurable data lifecycle management
  • Compression: Advanced data compression algorithms

Architecture

Core Components

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│ Coordinator │ │ Coordinator │ │ Coordinator │
│ Service │ │ Service │ │ Service │
└─────────┬───────┘ └─────────┬───────┘ └─────────┬───────┘
│ │ │
└──────────────────────┼──────────────────────┘

┌─────────────┴─────────────┐
│ Load Balancer │
└─────────────┬─────────────┘

┌─────────────────────────┼─────────────────────────┐
│ │ │
┌───────▼────────┐ ┌───────────▼──────────┐ ┌───────▼────────┐
│ Historical │ │ Historical │ │ Historical │
│ Service │ │ Service │ │ Service │
└────────────────┘ └──────────────────────┘ └────────────────┘
│ │ │
└──────────────────────┼──────────────────────┘

┌───────────▼──────────┐
│ Middle Manager │
│ (Ingestion) │
└──────────────────────┘

Component Roles

  • Coordinator Service: Metadata management and cluster coordination
  • Historical Service: Long-term data storage and query processing
  • Middle Manager: Data ingestion and real-time processing
  • Broker Service: Query routing and result aggregation
  • Router Service: Request routing and load balancing

Installation

Docker Installation

# Pull Druid image
docker pull apache/druid:latest

# Run Druid with docker-compose
wget https://raw.githubusercontent.com/apache/druid/master/distribution/docker/docker-compose.yml

# Start Druid services
docker-compose up -d

Native Installation

# Download Druid
wget https://downloads.apache.org/druid/0.23.0/apache-druid-0.23.0-bin.tar.gz
tar -xzf apache-druid-0.23.0-bin.tar.gz
cd apache-druid-0.23.0

# Start Druid services
bin/start-nano-quickstart

Cluster Setup

# Start Coordinator
bin/start-coordinator.sh

# Start Historical
bin/start-historical.sh

# Start Middle Manager
bin/start-middleManager.sh

# Start Broker
bin/start-broker.sh

# Start Router
bin/start-router.sh

Configuration

Coordinator Configuration (conf/druid/cluster/master/coordinator/runtime.properties)

# Coordinator settings
druid.port=8081
druid.service=coordinator

# Metadata storage
druid.metadata.storage.type=derby
druid.metadata.storage.connector.connectURI=jdbc:derby://localhost:1527/druid;create=true
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=druid

# Zookeeper configuration
druid.zk.service.host=localhost
druid.zk.paths.base=/druid

# Memory configuration
druid.processing.buffer.sizeBytes=256000000
druid.processing.numMergeBuffers=2

Historical Configuration (conf/druid/cluster/data/historical/runtime.properties)

# Historical settings
druid.port=8083
druid.service=historical

# Storage configuration
druid.processing.buffer.sizeBytes=256000000
druid.processing.numMergeBuffers=2
druid.processing.numThreads=1

# Cache configuration
druid.cache.type=local
druid.cache.sizeInBytes=1000000000

# Segment storage
druid.segmentCache.locations=[{"path":"/tmp/druid/indexCache","maxSize":1000000000}]

Middle Manager Configuration (conf/druid/cluster/data/middleManager/runtime.properties)

# Middle Manager settings
druid.port=8091
druid.service=middleManager

# Worker configuration
druid.worker.capacity=3
druid.processing.buffer.sizeBytes=256000000
druid.processing.numMergeBuffers=2
druid.processing.numThreads=1

# Task configuration
druid.indexer.runner.javaOpts=-server -Xmx2g -Xms2g
druid.indexer.fork.property.druid.processing.buffer.sizeBytes=256000000

Data Ingestion

Batch Ingestion

{
"type": "index_parallel",
"spec": {
"dataSchema": {
"dataSource": "sales_events",
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": ["customer_id", "product_id", "region", "category"]
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "revenue",
"fieldName": "amount"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "HOUR",
"rollup": true
}
},
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "local",
"baseDir": "/path/to/data",
"filter": "*.json"
},
"inputFormat": {
"type": "json"
}
},
"tuningConfig": {
"type": "index_parallel",
"partitionsSpec": {
"type": "dynamic"
}
}
}
}

Real-time Ingestion

{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "real_time_events",
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": ["user_id", "event_type", "page_url"]
},
"metricsSpec": [
{
"type": "count",
"name": "count"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "MINUTE",
"rollup": true
}
},
"ioConfig": {
"type": "kafka",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"topic": "events",
"inputFormat": {
"type": "json"
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
}
}
}

Query Language (Druid SQL)

Basic Queries

-- Simple SELECT
SELECT * FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
LIMIT 100;

-- Aggregation queries
SELECT
region,
COUNT(*) as total_sales,
SUM(revenue) as total_revenue,
AVG(revenue) as avg_revenue
FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
GROUP BY region
ORDER BY total_revenue DESC;

-- Time-based aggregations
SELECT
TIME_FLOOR(__time, 'PT1H') as hour,
COUNT(*) as events_per_hour,
SUM(revenue) as hourly_revenue
FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
GROUP BY TIME_FLOOR(__time, 'PT1H')
ORDER BY hour;

Advanced Analytics

-- Window functions
SELECT
customer_id,
__time,
revenue,
SUM(revenue) OVER (
PARTITION BY customer_id
ORDER BY __time
ROWS UNBOUNDED PRECEDING
) as running_total
FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
ORDER BY customer_id, __time;

-- Top N analysis
SELECT
region,
product_id,
SUM(revenue) as total_revenue
FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
GROUP BY region, product_id
QUALIFY ROW_NUMBER() OVER (
PARTITION BY region
ORDER BY SUM(revenue) DESC
) <= 5
ORDER BY region, total_revenue DESC;

Native Druid Queries

{
"queryType": "groupBy",
"dataSource": "sales_events",
"granularity": "day",
"dimensions": ["region", "category"],
"aggregations": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "revenue",
"fieldName": "revenue"
}
],
"intervals": ["2024-01-01/2024-02-01"],
"filter": {
"type": "selector",
"dimension": "region",
"value": "North America"
}
}

Performance Optimization

Query Optimization

-- Use time-based filters
SELECT * FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
AND __time < TIMESTAMP '2024-02-01 00:00:00';

-- Use appropriate granularity
SELECT
TIME_FLOOR(__time, 'PT1H') as hour,
COUNT(*) as events
FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
GROUP BY TIME_FLOOR(__time, 'PT1H');

-- Use LIMIT for large result sets
SELECT * FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
LIMIT 1000;

Data Modeling

{
"dimensionsSpec": {
"dimensions": [
{
"type": "string",
"name": "customer_id"
},
{
"type": "string",
"name": "product_id"
},
{
"type": "long",
"name": "timestamp"
}
]
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "revenue",
"fieldName": "amount"
},
{
"type": "doubleMin",
"name": "min_amount",
"fieldName": "amount"
},
{
"type": "doubleMax",
"name": "max_amount",
"fieldName": "amount"
}
]
}

Monitoring and Maintenance

Health Checks

# Check coordinator status
curl -X GET http://localhost:8081/status

# Check historical status
curl -X GET http://localhost:8083/status

# Check broker status
curl -X GET http://localhost:8082/status

# Check middle manager status
curl -X GET http://localhost:8091/status

System Queries

-- Check data sources
SELECT datasource, count as segments, size as size_bytes
FROM sys.segments
ORDER BY size_bytes DESC;

-- Check query history
SELECT
query_id,
datasource,
query,
duration,
__time
FROM sys.query
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
ORDER BY duration DESC;

Maintenance Tasks

# Kill long-running queries
curl -X POST http://localhost:8082/druid/v2/sql/statements/query-id/cancel

# Compact segments
curl -X POST http://localhost:8081/druid/coordinator/v1/datasources/datasource-name/compaction \
-H "Content-Type: application/json" \
-d '{"dataSource": "datasource-name"}'

# Kill tasks
curl -X POST http://localhost:8091/druid/indexer/v1/task/task-id/shutdown

Integration Examples

Python Integration

import requests
import json
import pandas as pd

# Druid query endpoint
druid_url = "http://localhost:8082/druid/v2/sql"

# SQL query
query = {
"query": """
SELECT
region,
COUNT(*) as total_sales,
SUM(revenue) as total_revenue
FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
GROUP BY region
ORDER BY total_revenue DESC
"""
}

# Execute query
response = requests.post(druid_url, json=query)
results = response.json()

# Convert to DataFrame
df = pd.DataFrame(results)
print(df)

Java Integration

import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.*;
import java.util.Map;
import java.util.List;

public class DruidClient {
private final OkHttpClient client = new OkHttpClient();
private final ObjectMapper mapper = new ObjectMapper();
private final String druidUrl = "http://localhost:8082/druid/v2/sql";

public List<Map<String, Object>> executeQuery(String sql) throws Exception {
String json = mapper.writeValueAsString(Map.of("query", sql));

RequestBody body = RequestBody.create(
MediaType.parse("application/json"), json
);

Request request = new Request.Builder()
.url(druidUrl)
.post(body)
.build();

try (Response response = client.newCall(request).execute()) {
String responseBody = response.body().string();
return mapper.readValue(responseBody, List.class);
}
}

public static void main(String[] args) throws Exception {
DruidClient client = new DruidClient();

String sql = """
SELECT region, COUNT(*) as total_sales
FROM sales_events
WHERE __time >= TIMESTAMP '2024-01-01 00:00:00'
GROUP BY region
""";

List<Map<String, Object>> results = client.executeQuery(sql);
results.forEach(System.out::println);
}
}

Best Practices

Data Modeling

  1. Choose Appropriate Granularity: Balance query performance vs storage
  2. Use Rollup Aggregations: Pre-compute common aggregations
  3. Optimize Dimensions: Keep dimension cardinality manageable
  4. Use Time-based Partitioning: Leverage Druid's time-series optimization

Performance

  1. Use Time Filters: Always include time-based filters in queries
  2. Limit Result Sets: Use LIMIT for large result sets
  3. Optimize Granularity: Choose appropriate query granularity
  4. Use Approximate Aggregations: Use hyperUnique for large cardinality

Operations

  1. Monitor Segments: Regularly check segment health and size
  2. Compact Segments: Periodically compact small segments
  3. Tune Memory: Configure appropriate memory settings
  4. Backup Metadata: Regular backup of metadata storage

Troubleshooting

Common Issues

  1. Ingestion Failures

    # Check task logs
    curl -X GET http://localhost:8091/druid/indexer/v1/task/task-id/log

    # Check task status
    curl -X GET http://localhost:8091/druid/indexer/v1/task/task-id/status
  2. Query Performance Issues

    -- Check slow queries
    SELECT query_id, duration, query
    FROM sys.query
    WHERE duration > 10000
    ORDER BY duration DESC;
  3. Memory Issues

    # Check JVM memory usage
    jstat -gc <druid_pid>

    # Check heap dump
    jmap -dump:format=b,file=heap.hprof <druid_pid>

Debug Mode

# Enable debug logging
export DRUID_LOG_LEVEL=DEBUG

# Check detailed logs
tail -f /var/log/druid/coordinator.log
tail -f /var/log/druid/historical.log
tail -f /var/log/druid/broker.log

Resources and References

Official Resources

  • Druid Console: Web-based management interface
  • Druid Imply: Commercial distribution with additional tools
  • Druid Connectors: Various data source connectors

Learning Resources

  • SQL Reference: Complete Druid SQL documentation
  • Performance Tuning Guide: Optimization best practices
  • Architecture Guide: Deep dive into Druid architecture