Introduction
A factory floor with 500 sensors is generating 2.6 billion data points per year. Every vibration reading, every temperature spike, every pressure anomaly is faithfully captured and stored. But when an engineer asks a straightforward question — “Show me all vibration anomalies from Building A’s CNC machines installed after 2023” — the team stares blankly at their screens. The data is there, scattered across three different systems, but nobody can answer that question in under ten minutes.
This scenario plays out in manufacturing plants, energy grids, building management systems, and IoT deployments worldwide. The root cause is always the same: the team treated metadata and time-series data as separate problems, and never designed the bridge between them.
In any industrial, manufacturing, or IoT system, you are dealing with two fundamentally different types of data that must work in concert. First, there is metadata — information about facilities, equipment, sensors, locations, configurations, maintenance history, and calibration records. This data is relational, hierarchical, and changes slowly. Second, there is time-series data — the actual sensor signals (temperature, vibration, pressure, torque, current, flow rate) streaming in at high frequency, sometimes thousands of readings per second. This data is append-only, voluminous, and indexed by time.
The relationship between these two data types is what makes everything work. A sensor reading of “47.3” means nothing without knowing that sensor S-0142 is a thermocouple mounted on a FANUC CNC spindle in Building A, calibrated last month, with an operating range of 15–85°C. The sensor_id is the bridge — metadata tells you what, time-series tells you when and how much.
Most teams get this relationship wrong. They embed metadata in every time-series row (creating massive bloat), or they completely separate the two without proper foreign keys (creating orphaned data), or they force everything into a single database that performs poorly on at least one workload. The result is the same: queries that should take milliseconds take minutes, data that should be connected is isolated, and engineers who should be finding anomalies are instead fighting with data infrastructure.
This guide is the definitive reference for designing a system that manages metadata and time-series data together correctly. We will walk through four architecture patterns, complete SQL schemas, Python code with SQLAlchemy and FastAPI, ingestion pipelines, query optimization strategies, and a real-world manufacturing example. By the end, you will have everything you need to build a system where that “CNC vibration anomalies in Building A” query returns results in under a second.
The Data Model Challenge
Before diving into solutions, let us clearly understand why these two data types are so difficult to manage together. They have fundamentally different characteristics, and a database architecture that is optimal for one is almost always suboptimal for the other.
Metadata: Relational, Hierarchical, Slowly Changing
Facility and sensor metadata follows a natural hierarchy. A typical industrial deployment looks like this:
Organization → Site → Building → Production Line → Machine → Component → Sensor
Each level in this hierarchy carries rich attributes. A sensor record might include: sensor type, unit of measurement, sampling rate in Hz, minimum and maximum operating range, calibration date, firmware version, installation date, and the equipment it is mounted on. A machine record includes manufacturer, model, serial number, commissioning date, maintenance schedule, and operating parameters.
This data is relational — sensors belong to equipment, equipment belongs to production lines, production lines belong to buildings. It is hierarchical — you often need to query “all sensors in Building A” which means traversing the tree. It is slowly changing — a sensor gets recalibrated, a machine gets moved to a different production line, firmware gets updated. And it is schema-rich — each entity type has many attributes with different data types, constraints, and relationships.
Time-Series: Append-Only, High Volume, Time-Indexed
Sensor readings are the opposite in nearly every way. A typical reading is just three fields: timestamp, sensor_id, and value. Maybe a few additional channels for multi-axis sensors (x, y, z for accelerometers). The schema is narrow and rarely changes.
But the volume is enormous. A single vibration sensor sampling at 1 kHz generates 86.4 million readings per day. Even at a modest 1 Hz sampling rate, 500 sensors produce 43.2 million readings per day — roughly 15.8 billion per year. This data is append-only (you almost never update a historical reading), time-indexed (every query includes a time range), and write-heavy (ingestion throughput is critical).
Characteristics Comparison
| Characteristic | Metadata | Time-Series |
|---|---|---|
| Schema | Wide, complex, many tables | Narrow (timestamp, id, value) |
| Volume | Thousands to millions of rows | Billions to trillions of rows |
| Write pattern | Infrequent updates, inserts | Continuous high-throughput appends |
| Read pattern | Lookups, JOINs, tree traversal | Range scans by time, aggregations |
| Relationships | Rich foreign keys, hierarchies | Single FK (sensor_id) |
| Mutability | Updates and deletes common | Append-only, rarely modified |
| Indexing | B-tree, GIN, full-text | Time-partitioned, BRIN |
| Retention | Keep forever | Tiered (raw → downsampled → archived) |
Common Mistakes
Teams typically fall into one of three traps:
Mistake 1: Embedding metadata in every time-series row. Instead of storing (timestamp, sensor_id, value), they store (timestamp, sensor_id, value, building_name, machine_name, manufacturer, sensor_type, unit, ...). A row that should be 24 bytes becomes 500 bytes. With billions of rows, this means terabytes of redundant data, slower queries, and a nightmare when metadata changes (do you backfill every historical row?).
Mistake 2: Complete separation without proper linking. Metadata lives in PostgreSQL, time-series lives in InfluxDB, and the only link is a sensor name string that someone typed manually. Sensor names get changed, new sensors get added to the time-series database without being registered in the metadata database, and suddenly 15% of your readings are orphaned — you have data from sensors that do not exist in your metadata system.
Mistake 3: Using one database for everything. Forcing all data into PostgreSQL means time-series queries are slow (no time-partitioning, no columnar compression). Forcing everything into InfluxDB means metadata queries are impossible (no JOINs, no foreign keys, no transactions). Neither database excels at the other’s workload.
sensor_id is the bridge between metadata and time-series. Your architecture must make it easy to start from either side — filter by metadata attributes and then fetch time-series, or detect time-series anomalies and then look up metadata context.
Architecture Patterns
There is no single “right” architecture for combining metadata and time-series data. The best choice depends on your scale, team expertise, existing infrastructure, and query patterns. Here are four proven patterns, from the most commonly recommended to the most specialized.
Pattern 1: PostgreSQL + TimescaleDB (Recommended)
This is the pattern I recommend for most teams, and the one we will spend the most time on. TimescaleDB is a PostgreSQL extension that adds time-series capabilities — hypertables, automatic partitioning by time, continuous aggregates, and compression — while keeping full PostgreSQL functionality. Because it runs inside PostgreSQL, you get native SQL JOINs between your metadata tables and your time-series hypertables.
Here is the complete schema:
-- Enable TimescaleDB
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- ============================================
-- METADATA TABLES
-- ============================================
CREATE TABLE facilities (
id SERIAL PRIMARY KEY,
name VARCHAR(200) NOT NULL,
location VARCHAR(500),
facility_type VARCHAR(50) NOT NULL, -- 'manufacturing', 'warehouse', 'office'
commissioned_date DATE,
status VARCHAR(20) DEFAULT 'active',
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE equipment (
id SERIAL PRIMARY KEY,
facility_id INTEGER NOT NULL REFERENCES facilities(id),
name VARCHAR(200) NOT NULL,
equipment_type VARCHAR(50) NOT NULL, -- 'cnc', 'robot', 'conveyor', 'pump'
manufacturer VARCHAR(200),
model VARCHAR(200),
serial_number VARCHAR(100) UNIQUE,
install_date DATE,
production_line VARCHAR(100),
status VARCHAR(20) DEFAULT 'operational',
operating_params JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_equipment_facility ON equipment(facility_id);
CREATE INDEX idx_equipment_type ON equipment(equipment_type);
CREATE INDEX idx_equipment_manufacturer ON equipment(manufacturer);
CREATE INDEX idx_equipment_line ON equipment(production_line);
CREATE TABLE sensors (
id SERIAL PRIMARY KEY,
equipment_id INTEGER NOT NULL REFERENCES equipment(id),
name VARCHAR(200) NOT NULL,
sensor_type VARCHAR(50) NOT NULL, -- 'temperature', 'vibration', 'pressure'
unit VARCHAR(20) NOT NULL, -- 'celsius', 'mm/s', 'bar', 'A'
sampling_rate_hz REAL DEFAULT 1.0,
min_range REAL,
max_range REAL,
calibration_date DATE,
firmware_version VARCHAR(50),
is_active BOOLEAN DEFAULT TRUE,
tags JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_sensors_equipment ON sensors(equipment_id);
CREATE INDEX idx_sensors_type ON sensors(sensor_type);
CREATE INDEX idx_sensors_active ON sensors(is_active) WHERE is_active = TRUE;
CREATE INDEX idx_sensors_tags ON sensors USING GIN(tags);
CREATE TABLE maintenance_logs (
id SERIAL PRIMARY KEY,
equipment_id INTEGER NOT NULL REFERENCES equipment(id),
maintenance_type VARCHAR(50) NOT NULL, -- 'preventive', 'corrective', 'calibration'
description TEXT,
performed_at TIMESTAMPTZ NOT NULL,
completed_at TIMESTAMPTZ,
technician VARCHAR(200),
parts_replaced JSONB DEFAULT '[]',
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_maintenance_equipment ON maintenance_logs(equipment_id);
CREATE INDEX idx_maintenance_time ON maintenance_logs(performed_at);
-- ============================================
-- TIME-SERIES TABLES (TimescaleDB Hypertables)
-- ============================================
CREATE TABLE sensor_readings (
time TIMESTAMPTZ NOT NULL,
sensor_id INTEGER NOT NULL REFERENCES sensors(id),
value DOUBLE PRECISION NOT NULL
);
SELECT create_hypertable('sensor_readings', 'time');
CREATE INDEX idx_readings_sensor_time ON sensor_readings (sensor_id, time DESC);
-- Enable compression (after 7 days)
ALTER TABLE sensor_readings SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'sensor_id',
timescaledb.compress_orderby = 'time DESC'
);
SELECT add_compression_policy('sensor_readings', INTERVAL '7 days');
-- Anomaly events table
CREATE TABLE anomaly_events (
id SERIAL PRIMARY KEY,
sensor_id INTEGER NOT NULL REFERENCES sensors(id),
start_time TIMESTAMPTZ NOT NULL,
end_time TIMESTAMPTZ,
anomaly_type VARCHAR(50) NOT NULL, -- 'threshold', 'trend', 'pattern'
severity VARCHAR(20) NOT NULL, -- 'low', 'medium', 'high', 'critical'
value_at_detection DOUBLE PRECISION,
model_version VARCHAR(50),
notes TEXT,
acknowledged BOOLEAN DEFAULT FALSE,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_anomaly_sensor ON anomaly_events(sensor_id);
CREATE INDEX idx_anomaly_time ON anomaly_events(start_time);
compress_segmentby = 'sensor_id' setting is critical. It tells TimescaleDB to group compressed data by sensor, which means queries filtered by sensor_id only decompress the relevant segments. Without this, every query would decompress entire chunks.
Now let us see the power of native JOINs. Here are queries that cross the metadata/time-series boundary effortlessly:
-- Query 1: Average temperature for all sensors in Building A, last 24 hours
SELECT
f.name AS facility,
e.name AS equipment,
s.name AS sensor,
AVG(r.value) AS avg_temp,
MIN(r.value) AS min_temp,
MAX(r.value) AS max_temp
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE f.name = 'Building A'
AND s.sensor_type = 'temperature'
AND r.time > NOW() - INTERVAL '24 hours'
GROUP BY f.name, e.name, s.name
ORDER BY avg_temp DESC;
-- Query 2: FANUC machines with vibration exceeding threshold
SELECT
e.name AS machine,
e.model,
s.name AS sensor,
s.max_range AS threshold,
MAX(r.value) AS peak_vibration,
COUNT(*) AS exceedance_count
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
WHERE e.manufacturer = 'FANUC'
AND s.sensor_type = 'vibration'
AND r.value > s.max_range
AND r.time > NOW() - INTERVAL '7 days'
GROUP BY e.name, e.model, s.name, s.max_range
ORDER BY peak_vibration DESC;
-- Query 3: Compare vibration across CNC machines on Production Line 3
SELECT
e.name AS machine,
time_bucket('1 hour', r.time) AS hour,
AVG(r.value) AS avg_vibration,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY r.value) AS p95_vibration
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
WHERE e.production_line = 'Line 3'
AND e.equipment_type = 'cnc'
AND s.sensor_type = 'vibration'
AND r.time > NOW() - INTERVAL '7 days'
GROUP BY e.name, hour
ORDER BY e.name, hour;
Notice how each query seamlessly combines metadata filters (facility name, manufacturer, production line, sensor type) with time-series operations (time ranges, aggregations, percentiles). This is the primary advantage of the PostgreSQL + TimescaleDB pattern — a single SQL statement can traverse the entire data model.
Pattern 2: PostgreSQL + InfluxDB
When InfluxDB is already part of your stack, or when write throughput exceeds what PostgreSQL can handle (generally above 500K inserts/second on a single node), a split architecture makes sense. Metadata stays in PostgreSQL, time-series goes to InfluxDB, and your application performs the JOIN.
import asyncpg
from influxdb_client import InfluxDBClient
from datetime import datetime, timedelta
class DualDatabaseQuery:
def __init__(self, pg_dsn: str, influx_url: str, influx_token: str, influx_org: str):
self.pg_dsn = pg_dsn
self.influx = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org)
self.query_api = self.influx.query_api()
async def get_readings_by_facility(
self, facility_name: str, sensor_type: str, hours: int = 24
):
# Step 1: Query metadata from PostgreSQL
conn = await asyncpg.connect(self.pg_dsn)
sensors = await conn.fetch("""
SELECT s.id, s.name, e.name AS equipment_name
FROM sensors s
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE f.name = $1 AND s.sensor_type = $2 AND s.is_active = TRUE
""", facility_name, sensor_type)
await conn.close()
if not sensors:
return []
# Step 2: Query time-series from InfluxDB, filtered by sensor IDs
sensor_ids = [str(s['id']) for s in sensors]
sensor_filter = ' or '.join(
f'r["sensor_id"] == "{sid}"' for sid in sensor_ids
)
flux_query = f'''
from(bucket: "sensor_data")
|> range(start: -{hours}h)
|> filter(fn: (r) => r["_measurement"] == "readings")
|> filter(fn: (r) => {sensor_filter})
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
'''
tables = self.query_api.query(flux_query)
# Step 3: Merge metadata with time-series results
sensor_lookup = {str(s['id']): s for s in sensors}
results = []
for table in tables:
for record in table.records:
sid = record.values.get("sensor_id")
meta = sensor_lookup.get(sid, {})
results.append({
"time": record.get_time(),
"sensor_id": sid,
"sensor_name": meta.get("name"),
"equipment": meta.get("equipment_name"),
"value": record.get_value(),
})
return results
The PostgreSQL + InfluxDB pattern works, but you lose the elegance of native JOINs. Every cross-domain query requires two round-trips, and complex queries (like “compare vibration patterns across machines by manufacturer”) require substantial application-level logic. Use this pattern when you already have InfluxDB in production and migration is not feasible, or when your write throughput genuinely exceeds PostgreSQL/TimescaleDB limits.
Pattern 3: PostgreSQL + Parquet/Iceberg on S3
For very large-scale deployments (terabytes of time-series data) or when the primary consumer is batch ML training pipelines, storing time-series data as Parquet files on S3 is cost-effective and scalable. Metadata stays in PostgreSQL, and you join them at query time using DuckDB, Athena, or Spark.
import duckdb
import asyncpg
from pathlib import Path
class ParquetTimeSeriesQuery:
"""
Time-series stored as Parquet files on S3, partitioned by:
s3://data-lake/sensor_readings/sensor_id={id}/date={YYYY-MM-DD}/data.parquet
"""
def __init__(self, pg_dsn: str, s3_base: str):
self.pg_dsn = pg_dsn
self.s3_base = s3_base
self.duck = duckdb.connect()
self.duck.execute("INSTALL httpfs; LOAD httpfs;")
self.duck.execute("SET s3_region='us-east-1';")
async def query_with_metadata(
self, facility_name: str, sensor_type: str, start_date: str, end_date: str
):
# Step 1: Get relevant sensor IDs from PostgreSQL
conn = await asyncpg.connect(self.pg_dsn)
sensors = await conn.fetch("""
SELECT s.id, s.name, s.unit, e.name AS equipment,
e.manufacturer, f.name AS facility
FROM sensors s
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE f.name = $1 AND s.sensor_type = $2
""", facility_name, sensor_type)
await conn.close()
# Step 2: Build Parquet glob paths for relevant sensors
sensor_ids = [s['id'] for s in sensors]
paths = [
f"{self.s3_base}/sensor_id={sid}/date=*/data.parquet"
for sid in sensor_ids
]
# Step 3: Query with DuckDB
result = self.duck.execute(f"""
SELECT
sensor_id,
date_trunc('hour', time) AS hour,
AVG(value) AS avg_value,
MAX(value) AS max_value,
COUNT(*) AS reading_count
FROM parquet_scan({paths})
WHERE time BETWEEN '{start_date}' AND '{end_date}'
GROUP BY sensor_id, hour
ORDER BY sensor_id, hour
""").fetchdf()
# Step 4: Merge with metadata
sensor_lookup = {s['id']: dict(s) for s in sensors}
result['equipment'] = result['sensor_id'].map(
lambda sid: sensor_lookup.get(sid, {}).get('equipment')
)
result['facility'] = result['sensor_id'].map(
lambda sid: sensor_lookup.get(sid, {}).get('facility')
)
return result
This pattern is best for data lakes and ML training pipelines where you need to process large volumes of historical data cost-effectively. Parquet’s columnar format provides excellent compression (10-20x vs CSV), and partitioning by sensor_id and date means queries only read relevant files. However, it is poorly suited for real-time queries or dashboards that need sub-second response times.
Pattern 4: TDengine Super Tables
TDengine takes a radically different approach. Its “super table” concept embeds metadata as tags directly alongside time-series data. Each physical sensor gets a sub-table that inherits from a super table, and tags (metadata) are stored only once per sub-table, not repeated in every row.
-- Create a super table with tags (metadata) and columns (time-series)
CREATE STABLE sensor_readings (
ts TIMESTAMP,
value DOUBLE,
quality INT
) TAGS (
facility NCHAR(200),
building NCHAR(100),
equipment NCHAR(200),
manufacturer NCHAR(200),
sensor_type NCHAR(50),
unit NCHAR(20),
line NCHAR(100)
);
-- Create sub-tables for each sensor (tags are set once)
CREATE TABLE sensor_0001 USING sensor_readings TAGS (
'Plant Chicago', 'Building A', 'CNC-001', 'FANUC', 'vibration', 'mm/s', 'Line 3'
);
CREATE TABLE sensor_0002 USING sensor_readings TAGS (
'Plant Chicago', 'Building A', 'CNC-001', 'FANUC', 'temperature', 'celsius', 'Line 3'
);
-- Insert data (just timestamp + values, no metadata repetition)
INSERT INTO sensor_0001 VALUES (NOW(), 4.52, 100);
INSERT INTO sensor_0002 VALUES (NOW(), 67.3, 100);
-- Query across all sensors using metadata tags
SELECT
facility,
equipment,
AVG(value) AS avg_vibration
FROM sensor_readings
WHERE sensor_type = 'vibration'
AND facility = 'Plant Chicago'
AND ts > NOW() - 24h
GROUP BY facility, equipment;
TDengine’s approach is elegant for IoT: the metadata is right there with the data, tags are indexed automatically, and you do not need a separate metadata database. The downside is that complex metadata relationships (maintenance logs, calibration history, hierarchical queries) are difficult to model with flat tags. If your metadata is simple and relatively static, TDengine is worth considering. If you need rich relational metadata, stick with Pattern 1 or 2.
Pattern Comparison
| Criteria | PG + TimescaleDB | PG + InfluxDB | PG + Parquet/S3 | TDengine |
|---|---|---|---|---|
| Complexity | Low | Medium | Medium-High | Low |
| Native JOINs | Yes | No (app-level) | No (query engine) | Tags only |
| Write throughput | 100K-500K rows/s | 1M+ rows/s | Batch (unlimited) | 1M+ rows/s |
| Query flexibility | Full SQL | Flux + SQL | SQL (DuckDB/Athena) | SQL subset |
| Metadata richness | Full relational | Full relational | Full relational | Flat tags only |
| Scalability | TB scale | TB scale | PB scale | TB scale |
| Best for | Most teams | Existing InfluxDB | Data lakes, ML | Simple IoT |
Detailed Schema Design Best Practices
Regardless of which architecture pattern you choose, certain schema design principles apply universally. Let us walk through the most important ones.
Hierarchical Facility Modeling
Facility hierarchies are inherently tree-structured. You need to efficiently answer queries like “give me all sensors in Building A” which means finding every equipment in every production line in that building. There are two good approaches in PostgreSQL.
Approach 1: The ltree extension
CREATE EXTENSION IF NOT EXISTS ltree;
-- Add a path column to each entity
ALTER TABLE facilities ADD COLUMN path ltree;
ALTER TABLE equipment ADD COLUMN path ltree;
ALTER TABLE sensors ADD COLUMN path ltree;
-- Example paths
-- Facility: 'org.chicago'
-- Equipment: 'org.chicago.building_a.line_3.cnc_001'
-- Sensor: 'org.chicago.building_a.line_3.cnc_001.vibration_x'
CREATE INDEX idx_facility_path ON facilities USING GIST(path);
CREATE INDEX idx_equipment_path ON equipment USING GIST(path);
CREATE INDEX idx_sensor_path ON sensors USING GIST(path);
-- Find all sensors under Building A (any depth)
SELECT s.* FROM sensors s
WHERE s.path <@ 'org.chicago.building_a';
-- Find all equipment exactly 2 levels below org.chicago
SELECT e.* FROM equipment e
WHERE e.path ~ 'org.chicago.*{2}';
Approach 2: Recursive CTEs with adjacency list
If you prefer not to use extensions, recursive CTEs work well for moderate-sized hierarchies:
-- Find all equipment under a specific facility, including nested structures
WITH RECURSIVE facility_tree AS (
-- Base case: the target facility
SELECT id, name, facility_type, id AS root_id
FROM facilities
WHERE name = 'Building A'
UNION ALL
-- Recursive case: equipment belonging to facilities in the tree
SELECT e.id, e.name, e.equipment_type, ft.root_id
FROM equipment e
JOIN facility_tree ft ON e.facility_id = ft.id
)
SELECT * FROM facility_tree;
Slowly Changing Dimensions (SCD Type 2)
Equipment moves between production lines. Sensors get recalibrated. Firmware gets updated. If you simply overwrite the old value, you lose the ability to correctly interpret historical data. A vibration reading from last month should be evaluated against the calibration that was active at that time, not today's calibration.
SCD Type 2 solves this by keeping a history of changes with effective date ranges:
CREATE TABLE sensor_history (
id SERIAL PRIMARY KEY,
sensor_id INTEGER NOT NULL REFERENCES sensors(id),
equipment_id INTEGER NOT NULL REFERENCES equipment(id),
calibration_date DATE,
min_range REAL,
max_range REAL,
firmware_version VARCHAR(50),
effective_from TIMESTAMPTZ NOT NULL DEFAULT NOW(),
effective_to TIMESTAMPTZ, -- NULL means "current"
is_current BOOLEAN DEFAULT TRUE
);
CREATE INDEX idx_sensor_history_current
ON sensor_history(sensor_id) WHERE is_current = TRUE;
CREATE INDEX idx_sensor_history_range
ON sensor_history(sensor_id, effective_from, effective_to);
-- When recalibrating a sensor:
-- Step 1: Close the current record
UPDATE sensor_history
SET effective_to = NOW(), is_current = FALSE
WHERE sensor_id = 42 AND is_current = TRUE;
-- Step 2: Insert new record
INSERT INTO sensor_history
(sensor_id, equipment_id, calibration_date, min_range, max_range,
firmware_version, effective_from, is_current)
VALUES
(42, 15, '2026-04-01', 0, 100, 'v3.2.1', NOW(), TRUE);
-- Query: What was the calibration when this anomaly was detected?
SELECT sh.*
FROM sensor_history sh
JOIN anomaly_events ae ON ae.sensor_id = sh.sensor_id
WHERE ae.id = 789
AND ae.start_time BETWEEN sh.effective_from
AND COALESCE(sh.effective_to, '9999-12-31'::timestamptz);
JSONB for Flexible Attributes
Not every piece of equipment has the same attributes. A CNC machine has spindle speed and tool count, a conveyor has belt speed and length, a robot has axis count and payload capacity. Rather than creating separate tables for each equipment type, use JSONB columns for type-specific attributes:
-- Equipment with flexible operating parameters
INSERT INTO equipment (facility_id, name, equipment_type, manufacturer,
model, operating_params)
VALUES
(1, 'CNC-001', 'cnc', 'FANUC', 'Robodrill a-D21MiB5', '{
"max_spindle_rpm": 24000,
"tool_capacity": 21,
"axes": 5,
"max_feed_rate_mm_min": 54000
}'::jsonb),
(1, 'Robot-001', 'robot', 'ABB', 'IRB 6700', '{
"axes": 6,
"payload_kg": 150,
"reach_mm": 2650,
"repeatability_mm": 0.05
}'::jsonb);
-- Query: Find all robots with payload > 100kg
SELECT name, model, operating_params->>'payload_kg' AS payload
FROM equipment
WHERE equipment_type = 'robot'
AND (operating_params->>'payload_kg')::numeric > 100;
-- Index for fast JSONB queries
CREATE INDEX idx_equipment_params ON equipment USING GIN(operating_params);
Tagging System for Ad-Hoc Grouping
Beyond the formal hierarchy, teams often need to group sensors by arbitrary criteria: "all sensors involved in the Q1 reliability study," "sensors monitored by the ML anomaly detection model," or "critical sensors requiring 24/7 alerting." A flexible tagging system supports this:
-- Sensors table already has a JSONB 'tags' column
-- Usage examples:
UPDATE sensors SET tags = '{
"monitoring_group": "critical_24x7",
"ml_model": "vibration_anomaly_v2",
"study": "q1_reliability",
"zone": "high_temperature"
}'::jsonb
WHERE id = 42;
-- Find all sensors in a monitoring group
SELECT s.*, e.name AS equipment
FROM sensors s
JOIN equipment e ON e.id = s.equipment_id
WHERE s.tags @> '{"monitoring_group": "critical_24x7"}';
-- Find sensors enrolled in a specific ML model
SELECT s.id, s.name, s.sensor_type
FROM sensors s
WHERE s.tags @> '{"ml_model": "vibration_anomaly_v2"}';
Data Ingestion Pipeline
Getting data from sensors into your database reliably is half the battle. A production ingestion pipeline typically follows this path:
Sensors → MQTT/Modbus → Kafka/MQTT Broker → Telegraf or Custom Consumer → Database
Telegraf Configuration
Telegraf is a popular agent for collecting and forwarding sensor data. Here is a configuration that reads from MQTT, enriches with metadata tags, and writes to TimescaleDB:
# telegraf.conf
[[inputs.mqtt_consumer]]
servers = ["tcp://mqtt-broker:1883"]
topics = ["sensors/+/readings"]
data_format = "json"
tag_keys = ["sensor_id"]
json_time_key = "timestamp"
json_time_format = "2006-01-02T15:04:05Z07:00"
# Enrich with metadata from a lookup file (updated periodically)
[[processors.enum]]
[[processors.enum.mapping]]
tag = "sensor_id"
dest = "sensor_type"
[processors.enum.mapping.value_mappings]
"S-0001" = "vibration"
"S-0002" = "temperature"
[[outputs.postgresql]]
connection = "postgres://user:pass@localhost/sensordb"
table_template = """
INSERT INTO sensor_readings (time, sensor_id, value)
VALUES ({time}, {sensor_id}::integer, {value})
"""
Python Ingestion Script with Validation
For more control, a custom Python ingestion script can validate sensor IDs against metadata, handle errors, and batch inserts:
import asyncio
import json
import logging
from datetime import datetime, timezone
from typing import Optional
import asyncpg
import aiomqtt
logger = logging.getLogger(__name__)
class SensorDataIngester:
"""Ingests sensor readings with metadata validation."""
def __init__(self, pg_dsn: str, mqtt_host: str, mqtt_port: int = 1883):
self.pg_dsn = pg_dsn
self.mqtt_host = mqtt_host
self.mqtt_port = mqtt_port
self.pool: Optional[asyncpg.Pool] = None
self.valid_sensors: set[int] = set()
self.batch: list[tuple] = []
self.batch_size = 1000
self.flush_interval = 5 # seconds
async def start(self):
"""Initialize connections and start ingestion."""
self.pool = await asyncpg.create_pool(self.pg_dsn, min_size=2, max_size=10)
await self._load_valid_sensors()
# Run batch flusher and MQTT listener concurrently
await asyncio.gather(
self._mqtt_listener(),
self._periodic_flush(),
self._periodic_sensor_refresh(),
)
async def _load_valid_sensors(self):
"""Load active sensor IDs from metadata database."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id FROM sensors WHERE is_active = TRUE"
)
self.valid_sensors = {row['id'] for row in rows}
logger.info(f"Loaded {len(self.valid_sensors)} active sensors")
async def _periodic_sensor_refresh(self):
"""Refresh valid sensor list every 5 minutes."""
while True:
await asyncio.sleep(300)
await self._load_valid_sensors()
async def _mqtt_listener(self):
"""Listen for sensor readings on MQTT."""
async with aiomqtt.Client(self.mqtt_host, self.mqtt_port) as client:
await client.subscribe("sensors/+/readings")
async for message in client.messages:
try:
payload = json.loads(message.payload)
sensor_id = int(payload['sensor_id'])
# Validate against metadata
if sensor_id not in self.valid_sensors:
logger.warning(
f"Rejected reading from unknown sensor {sensor_id}"
)
continue
timestamp = datetime.fromisoformat(payload['timestamp'])
if timestamp.tzinfo is None:
timestamp = timestamp.replace(tzinfo=timezone.utc)
value = float(payload['value'])
self.batch.append((timestamp, sensor_id, value))
if len(self.batch) >= self.batch_size:
await self._flush_batch()
except (json.JSONDecodeError, KeyError, ValueError) as e:
logger.error(f"Invalid message: {e}")
async def _periodic_flush(self):
"""Flush batch at regular intervals."""
while True:
await asyncio.sleep(self.flush_interval)
if self.batch:
await self._flush_batch()
async def _flush_batch(self):
"""Insert batch of readings into TimescaleDB."""
if not self.batch:
return
batch_to_insert = self.batch.copy()
self.batch.clear()
try:
async with self.pool.acquire() as conn:
await conn.executemany(
"""INSERT INTO sensor_readings (time, sensor_id, value)
VALUES ($1, $2, $3)""",
batch_to_insert
)
logger.info(f"Inserted {len(batch_to_insert)} readings")
except Exception as e:
logger.error(f"Batch insert failed: {e}")
# Re-add failed batch for retry
self.batch.extend(batch_to_insert)
# Data quality checks
async def check_data_quality(pool: asyncpg.Pool):
"""Detect common data quality issues."""
async with pool.acquire() as conn:
# Orphaned readings (sensor_id not in sensors table)
orphaned = await conn.fetchval("""
SELECT COUNT(DISTINCT r.sensor_id)
FROM sensor_readings r
LEFT JOIN sensors s ON s.id = r.sensor_id
WHERE s.id IS NULL
AND r.time > NOW() - INTERVAL '24 hours'
""")
# Sensors with no recent readings (possible failure)
silent = await conn.fetch("""
SELECT s.id, s.name, e.name AS equipment,
MAX(r.time) AS last_reading
FROM sensors s
JOIN equipment e ON e.id = s.equipment_id
LEFT JOIN sensor_readings r ON r.sensor_id = s.id
AND r.time > NOW() - INTERVAL '24 hours'
WHERE s.is_active = TRUE
GROUP BY s.id, s.name, e.name
HAVING MAX(r.time) IS NULL
OR MAX(r.time) < NOW() - INTERVAL '1 hour'
""")
# Sensors with values outside their calibrated range
out_of_range = await conn.fetch("""
SELECT s.id, s.name, s.min_range, s.max_range,
MIN(r.value) AS min_val, MAX(r.value) AS max_val,
COUNT(*) AS violation_count
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
WHERE r.time > NOW() - INTERVAL '24 hours'
AND (r.value < s.min_range OR r.value > s.max_range)
GROUP BY s.id, s.name, s.min_range, s.max_range
""")
return {
"orphaned_sensor_ids": orphaned,
"silent_sensors": [dict(r) for r in silent],
"out_of_range_sensors": [dict(r) for r in out_of_range],
}
_load_valid_sensors() method caches active sensor IDs in memory and refreshes every 5 minutes. This prevents a database round-trip for every incoming message while catching new sensor registrations within a reasonable window.
Handling Late-Arriving and Out-of-Order Data
In real-world deployments, data does not always arrive in order. Network delays, edge device buffering, and batch uploads from remote sites all produce out-of-order events. TimescaleDB handles this gracefully — inserts are not required to be in time order. However, if you are using continuous aggregates or materialized views, you need to configure a refresh policy that covers the maximum expected delay:
-- Continuous aggregate that tolerates late data (up to 1 hour)
CREATE MATERIALIZED VIEW hourly_averages
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS bucket,
sensor_id,
AVG(value) AS avg_value,
MIN(value) AS min_value,
MAX(value) AS max_value,
COUNT(*) AS sample_count
FROM sensor_readings
GROUP BY bucket, sensor_id
WITH NO DATA;
-- Refresh policy: refresh the last 2 hours every 30 minutes
SELECT add_continuous_aggregate_policy('hourly_averages',
start_offset => INTERVAL '2 hours',
end_offset => INTERVAL '30 minutes',
schedule_interval => INTERVAL '30 minutes'
);
Querying Across Metadata and Time-Series
The true value of a well-designed schema emerges when you start writing queries that cross the metadata/time-series boundary. Here are five common query patterns with complete SQL and Python implementations.
All Readings by Location and Sensor Type
-- All vibration readings from sensors in Building A, last 7 days
-- Using TimescaleDB time_bucket for efficient aggregation
SELECT
time_bucket('15 minutes', r.time) AS period,
e.name AS equipment,
s.name AS sensor,
AVG(r.value) AS avg_vibration,
MAX(r.value) AS peak_vibration,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY r.value) AS p99_vibration
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE f.name = 'Building A'
AND s.sensor_type = 'vibration'
AND r.time > NOW() - INTERVAL '7 days'
GROUP BY period, e.name, s.name
ORDER BY period DESC, peak_vibration DESC;
Average Daily Values Grouped by Manufacturer
-- Average daily temperature per facility, grouped by equipment manufacturer
SELECT
f.name AS facility,
e.manufacturer,
time_bucket('1 day', r.time) AS day,
AVG(r.value) AS avg_temperature,
COUNT(DISTINCT s.id) AS sensor_count
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE s.sensor_type = 'temperature'
AND r.time > NOW() - INTERVAL '30 days'
GROUP BY f.name, e.manufacturer, day
ORDER BY f.name, e.manufacturer, day;
Equipment with Sensors Exceeding Their Range
-- Find equipment where any sensor exceeded its max_range in the past month
SELECT
f.name AS facility,
e.name AS equipment,
e.manufacturer,
s.name AS sensor,
s.sensor_type,
s.max_range AS threshold,
MAX(r.value) AS peak_value,
COUNT(*) FILTER (WHERE r.value > s.max_range) AS exceedance_count,
MIN(r.time) FILTER (WHERE r.value > s.max_range) AS first_exceedance,
MAX(r.time) FILTER (WHERE r.value > s.max_range) AS last_exceedance
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE r.time > NOW() - INTERVAL '30 days'
AND s.max_range IS NOT NULL
GROUP BY f.name, e.name, e.manufacturer, s.name, s.sensor_type, s.max_range
HAVING COUNT(*) FILTER (WHERE r.value > s.max_range) > 0
ORDER BY exceedance_count DESC;
Readings Before and After Maintenance
-- Compare sensor readings 24 hours before and after a maintenance event
WITH maintenance AS (
SELECT id, equipment_id, performed_at, maintenance_type
FROM maintenance_logs
WHERE id = 456 -- specific maintenance event
),
before_maintenance AS (
SELECT
s.name AS sensor,
s.sensor_type,
AVG(r.value) AS avg_value,
STDDEV(r.value) AS stddev_value,
'before' AS period
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN maintenance m ON s.equipment_id = m.equipment_id
WHERE r.time BETWEEN m.performed_at - INTERVAL '24 hours' AND m.performed_at
GROUP BY s.name, s.sensor_type
),
after_maintenance AS (
SELECT
s.name AS sensor,
s.sensor_type,
AVG(r.value) AS avg_value,
STDDEV(r.value) AS stddev_value,
'after' AS period
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN maintenance m ON s.equipment_id = m.equipment_id
WHERE r.time BETWEEN m.performed_at AND m.performed_at + INTERVAL '24 hours'
GROUP BY s.name, s.sensor_type
)
SELECT
b.sensor,
b.sensor_type,
b.avg_value AS avg_before,
a.avg_value AS avg_after,
ROUND(((a.avg_value - b.avg_value) / NULLIF(b.avg_value, 0) * 100)::numeric, 2)
AS pct_change,
b.stddev_value AS stddev_before,
a.stddev_value AS stddev_after
FROM before_maintenance b
JOIN after_maintenance a ON a.sensor = b.sensor
ORDER BY ABS((a.avg_value - b.avg_value) / NULLIF(b.avg_value, 0)) DESC;
Anomaly Events with Full Context
-- Anomaly events for FANUC robots installed in 2024, with full context
SELECT
ae.id AS anomaly_id,
ae.anomaly_type,
ae.severity,
ae.start_time,
ae.end_time,
ae.value_at_detection,
s.name AS sensor,
s.sensor_type,
s.max_range,
e.name AS equipment,
e.manufacturer,
e.model,
e.install_date,
f.name AS facility
FROM anomaly_events ae
JOIN sensors s ON s.id = ae.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE e.manufacturer = 'FANUC'
AND e.equipment_type = 'robot'
AND e.install_date >= '2024-01-01'
AND ae.start_time > NOW() - INTERVAL '90 days'
ORDER BY ae.severity DESC, ae.start_time DESC;
Python Query Service
Wrapping these queries in a service class provides a clean interface for application code:
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import asyncpg
@dataclass
class SensorReading:
time: datetime
sensor_id: int
sensor_name: str
equipment_name: str
facility_name: str
sensor_type: str
value: float
unit: str
class QueryService:
"""Combines metadata filtering with time-series queries."""
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def get_readings(
self,
facility: Optional[str] = None,
equipment_type: Optional[str] = None,
manufacturer: Optional[str] = None,
sensor_type: Optional[str] = None,
production_line: Optional[str] = None,
tags: Optional[dict] = None,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
bucket_interval: str = '1 hour',
) -> list[dict]:
"""
Flexible query combining metadata filters with time-series aggregation.
"""
if start is None:
start = datetime.utcnow() - timedelta(hours=24)
if end is None:
end = datetime.utcnow()
conditions = ["r.time >= $1", "r.time <= $2"]
params: list = [start, end]
param_idx = 3
if facility:
conditions.append(f"f.name = ${param_idx}")
params.append(facility)
param_idx += 1
if equipment_type:
conditions.append(f"e.equipment_type = ${param_idx}")
params.append(equipment_type)
param_idx += 1
if manufacturer:
conditions.append(f"e.manufacturer = ${param_idx}")
params.append(manufacturer)
param_idx += 1
if sensor_type:
conditions.append(f"s.sensor_type = ${param_idx}")
params.append(sensor_type)
param_idx += 1
if production_line:
conditions.append(f"e.production_line = ${param_idx}")
params.append(production_line)
param_idx += 1
if tags:
conditions.append(f"s.tags @> ${param_idx}::jsonb")
params.append(json.dumps(tags))
param_idx += 1
where_clause = " AND ".join(conditions)
query = f"""
SELECT
time_bucket('{bucket_interval}', r.time) AS bucket,
s.id AS sensor_id,
s.name AS sensor_name,
s.sensor_type,
s.unit,
e.name AS equipment_name,
e.manufacturer,
f.name AS facility_name,
AVG(r.value) AS avg_value,
MIN(r.value) AS min_value,
MAX(r.value) AS max_value,
COUNT(*) AS sample_count
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE {where_clause}
GROUP BY bucket, s.id, s.name, s.sensor_type, s.unit,
e.name, e.manufacturer, f.name
ORDER BY bucket DESC, sensor_name
"""
async with self.pool.acquire() as conn:
rows = await conn.fetch(query, *params)
return [dict(r) for r in rows]
async def get_equipment_health(self, equipment_id: int) -> dict:
"""Get comprehensive health status for a piece of equipment."""
async with self.pool.acquire() as conn:
# Equipment metadata
equipment = await conn.fetchrow("""
SELECT e.*, f.name AS facility_name
FROM equipment e
JOIN facilities f ON f.id = e.facility_id
WHERE e.id = $1
""", equipment_id)
# Latest readings from all sensors
latest_readings = await conn.fetch("""
SELECT DISTINCT ON (s.id)
s.id AS sensor_id, s.name, s.sensor_type, s.unit,
s.min_range, s.max_range,
r.time AS last_reading_time,
r.value AS last_value,
CASE
WHEN r.value > s.max_range THEN 'exceeded'
WHEN r.value < s.min_range THEN 'below_range'
ELSE 'normal'
END AS range_status
FROM sensors s
LEFT JOIN sensor_readings r ON r.sensor_id = s.id
AND r.time > NOW() - INTERVAL '1 hour'
WHERE s.equipment_id = $1 AND s.is_active = TRUE
ORDER BY s.id, r.time DESC
""", equipment_id)
# Recent anomalies
anomalies = await conn.fetch("""
SELECT ae.*, s.name AS sensor_name, s.sensor_type
FROM anomaly_events ae
JOIN sensors s ON s.id = ae.sensor_id
WHERE s.equipment_id = $1
AND ae.start_time > NOW() - INTERVAL '7 days'
ORDER BY ae.start_time DESC
LIMIT 20
""", equipment_id)
# Last maintenance
last_maintenance = await conn.fetchrow("""
SELECT * FROM maintenance_logs
WHERE equipment_id = $1
ORDER BY performed_at DESC LIMIT 1
""", equipment_id)
return {
"equipment": dict(equipment) if equipment else None,
"sensors": [dict(r) for r in latest_readings],
"recent_anomalies": [dict(a) for a in anomalies],
"last_maintenance": dict(last_maintenance) if last_maintenance else None,
"overall_status": self._calculate_status(latest_readings, anomalies),
}
@staticmethod
def _calculate_status(readings, anomalies) -> str:
critical_anomalies = [a for a in anomalies if a['severity'] == 'critical']
exceeded_sensors = [r for r in readings if r['range_status'] == 'exceeded']
if critical_anomalies or len(exceeded_sensors) > 2:
return "critical"
elif exceeded_sensors or any(a['severity'] == 'high' for a in anomalies):
return "warning"
return "healthy"
API Design for Metadata + Time-Series
A well-designed API layer makes the combined metadata/time-series system accessible to dashboards, mobile apps, and other services. Here is a FastAPI implementation that exposes the key endpoints:
from datetime import datetime, timedelta
from typing import Optional
import asyncpg
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
app = FastAPI(title="Sensor Data API")
pool: asyncpg.Pool = None
@app.on_event("startup")
async def startup():
global pool
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/sensordb",
min_size=5, max_size=20
)
@app.on_event("shutdown")
async def shutdown():
await pool.close()
# ---- Pydantic Models ----
class FacilityResponse(BaseModel):
id: int
name: str
location: Optional[str]
facility_type: str
status: str
equipment_count: int
class EquipmentResponse(BaseModel):
id: int
name: str
equipment_type: str
manufacturer: Optional[str]
model: Optional[str]
status: str
sensor_count: int
production_line: Optional[str]
class SensorReadingResponse(BaseModel):
time: datetime
value: float
sensor_name: str
sensor_type: str
unit: str
class EquipmentHealthResponse(BaseModel):
equipment_id: int
equipment_name: str
facility: str
status: str
sensors: list[dict]
recent_anomalies: list[dict]
last_maintenance: Optional[dict]
# ---- Endpoints ----
@app.get("/facilities/{facility_id}/equipment",
response_model=list[EquipmentResponse])
async def list_equipment(facility_id: int):
"""List all equipment in a facility with metadata."""
async with pool.acquire() as conn:
rows = await conn.fetch("""
SELECT e.id, e.name, e.equipment_type, e.manufacturer,
e.model, e.status, e.production_line,
COUNT(s.id) AS sensor_count
FROM equipment e
LEFT JOIN sensors s ON s.equipment_id = e.id AND s.is_active = TRUE
WHERE e.facility_id = $1
GROUP BY e.id
ORDER BY e.production_line, e.name
""", facility_id)
if not rows:
raise HTTPException(404, "Facility not found or has no equipment")
return [dict(r) for r in rows]
@app.get("/sensors/{sensor_id}/readings",
response_model=list[SensorReadingResponse])
async def get_sensor_readings(
sensor_id: int,
start: datetime = Query(default_factory=lambda: datetime.utcnow() - timedelta(hours=24)),
end: datetime = Query(default_factory=datetime.utcnow),
bucket: str = Query(default="15 minutes",
description="Aggregation interval, e.g. '5 minutes', '1 hour'"),
):
"""Get time-series readings for a sensor with metadata context."""
async with pool.acquire() as conn:
# Verify sensor exists and get metadata
sensor = await conn.fetchrow("""
SELECT s.name, s.sensor_type, s.unit
FROM sensors s WHERE s.id = $1
""", sensor_id)
if not sensor:
raise HTTPException(404, "Sensor not found")
readings = await conn.fetch(f"""
SELECT
time_bucket('{bucket}', r.time) AS time,
AVG(r.value) AS value
FROM sensor_readings r
WHERE r.sensor_id = $1
AND r.time BETWEEN $2 AND $3
GROUP BY time_bucket('{bucket}', r.time)
ORDER BY time DESC
""", sensor_id, start, end)
return [
{
"time": r["time"],
"value": round(r["value"], 4),
"sensor_name": sensor["name"],
"sensor_type": sensor["sensor_type"],
"unit": sensor["unit"],
}
for r in readings
]
@app.get("/equipment/{equipment_id}/health",
response_model=EquipmentHealthResponse)
async def get_equipment_health(equipment_id: int):
"""
Combined health view: latest sensor readings + metadata + anomalies.
Single endpoint that crosses metadata and time-series boundaries.
"""
query_service = QueryService(pool)
health = await query_service.get_equipment_health(equipment_id)
if not health["equipment"]:
raise HTTPException(404, "Equipment not found")
return {
"equipment_id": equipment_id,
"equipment_name": health["equipment"]["name"],
"facility": health["equipment"]["facility_name"],
"status": health["overall_status"],
"sensors": health["sensors"],
"recent_anomalies": health["recent_anomalies"],
"last_maintenance": health["last_maintenance"],
}
@app.get("/facilities/{facility_id}/sensors/readings")
async def get_facility_readings(
facility_id: int,
sensor_type: Optional[str] = None,
manufacturer: Optional[str] = None,
production_line: Optional[str] = None,
start: datetime = Query(
default_factory=lambda: datetime.utcnow() - timedelta(hours=24)
),
end: datetime = Query(default_factory=datetime.utcnow),
bucket: str = "1 hour",
):
"""
Get aggregated readings for all sensors in a facility,
with optional metadata filters.
"""
conditions = ["f.id = $1", "r.time >= $2", "r.time <= $3"]
params = [facility_id, start, end]
idx = 4
if sensor_type:
conditions.append(f"s.sensor_type = ${idx}")
params.append(sensor_type)
idx += 1
if manufacturer:
conditions.append(f"e.manufacturer = ${idx}")
params.append(manufacturer)
idx += 1
if production_line:
conditions.append(f"e.production_line = ${idx}")
params.append(production_line)
idx += 1
where = " AND ".join(conditions)
async with pool.acquire() as conn:
rows = await conn.fetch(f"""
SELECT
time_bucket('{bucket}', r.time) AS time,
e.name AS equipment,
e.manufacturer,
s.name AS sensor,
s.sensor_type,
s.unit,
AVG(r.value) AS avg_value,
MAX(r.value) AS max_value,
MIN(r.value) AS min_value
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE {where}
GROUP BY time_bucket('{bucket}', r.time),
e.name, e.manufacturer, s.name, s.sensor_type, s.unit
ORDER BY time DESC
""", *params)
return [dict(r) for r in rows]
/equipment/{id}/health endpoint demonstrates the power of combining metadata and time-series in a single API response. A dashboard can render equipment details, live sensor values, anomaly alerts, and maintenance history from a single API call.
Handling Scale
A system with 500 sensors at 1 Hz generates about 43 million readings per day. At 10 Hz, that jumps to 432 million. Within a year, you are looking at 15-150 billion rows. Without a data lifecycle strategy, storage costs will grow linearly forever.
Data Retention Policies
| Data Tier | Resolution | Retention | Storage | Use Case |
|---|---|---|---|---|
| Raw | Full resolution (1-1000 Hz) | 30 days | TimescaleDB (compressed) | Real-time dashboards, debugging |
| Downsampled | 1-minute or 5-minute averages | 1 year | TimescaleDB continuous aggregate | Trend analysis, weekly reports |
| Aggregated | Hourly or daily summaries | Forever | PostgreSQL regular table | Historical comparisons, audits |
| Archived | Full resolution | 7 years | Parquet on S3/Glacier | Compliance, ML retraining |
Implementing this with TimescaleDB:
-- Continuous aggregate: 5-minute downsampling (auto-maintained)
CREATE MATERIALIZED VIEW readings_5min
WITH (timescaledb.continuous) AS
SELECT
time_bucket('5 minutes', time) AS bucket,
sensor_id,
AVG(value) AS avg_value,
MIN(value) AS min_value,
MAX(value) AS max_value,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY value) AS median_value,
COUNT(*) AS sample_count
FROM sensor_readings
GROUP BY bucket, sensor_id
WITH NO DATA;
SELECT add_continuous_aggregate_policy('readings_5min',
start_offset => INTERVAL '2 hours',
end_offset => INTERVAL '30 minutes',
schedule_interval => INTERVAL '30 minutes'
);
-- Continuous aggregate: hourly (built on top of 5-min aggregate)
CREATE MATERIALIZED VIEW readings_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', bucket) AS bucket,
sensor_id,
AVG(avg_value) AS avg_value,
MIN(min_value) AS min_value,
MAX(max_value) AS max_value,
SUM(sample_count) AS sample_count
FROM readings_5min
GROUP BY time_bucket('1 hour', bucket), sensor_id
WITH NO DATA;
SELECT add_continuous_aggregate_policy('readings_hourly',
start_offset => INTERVAL '4 hours',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
-- Drop raw data after 30 days
SELECT add_retention_policy('sensor_readings', INTERVAL '30 days');
-- Keep 5-minute aggregates for 1 year
SELECT add_retention_policy('readings_5min', INTERVAL '1 year');
add_retention_policy drops a chunk, the raw data is gone. Export to Parquet on S3 first if you need long-term raw data access for compliance or ML training.
Real-World Example: Manufacturing Plant
Let us walk through a complete real-world scenario to tie everything together. Imagine a manufacturing plant with the following setup:
- 3 buildings (A, B, C) on a single campus
- 50 machines: 20 CNC machines (FANUC, DMG Mori), 15 robots (ABB, KUKA), 10 conveyors, 5 pumps
- 500 sensors: vibration, temperature, pressure, current, torque, flow rate
- Average sampling rate: 10 Hz (some vibration sensors at 1 kHz for spectral analysis)
The Schema
-- Seed the metadata
INSERT INTO facilities (name, location, facility_type, commissioned_date, status) VALUES
('Building A', 'North Campus, Chicago IL', 'manufacturing', '2019-03-15', 'active'),
('Building B', 'North Campus, Chicago IL', 'manufacturing', '2021-07-01', 'active'),
('Building C', 'North Campus, Chicago IL', 'warehouse', '2022-01-10', 'active');
-- Sample equipment (showing pattern, not all 50)
INSERT INTO equipment (facility_id, name, equipment_type, manufacturer, model,
serial_number, install_date, production_line, status,
operating_params) VALUES
(1, 'CNC-A01', 'cnc', 'FANUC', 'Robodrill a-D21MiB5', 'FN-2024-0891',
'2024-03-15', 'Line 1', 'operational',
'{"max_spindle_rpm": 24000, "tool_capacity": 21, "axes": 5}'),
(1, 'CNC-A02', 'cnc', 'DMG Mori', 'DMU 50', 'DM-2023-4521',
'2023-09-01', 'Line 1', 'operational',
'{"max_spindle_rpm": 20000, "tool_capacity": 30, "axes": 5}'),
(1, 'Robot-A01', 'robot', 'ABB', 'IRB 6700', 'ABB-2024-1122',
'2024-06-10', 'Line 2', 'operational',
'{"axes": 6, "payload_kg": 150, "reach_mm": 2650}'),
(2, 'CNC-B01', 'cnc', 'FANUC', 'Robodrill a-D21LiB5ADV', 'FN-2024-1205',
'2024-11-20', 'Line 3', 'operational',
'{"max_spindle_rpm": 24000, "tool_capacity": 21, "axes": 5}');
-- Sensors for CNC-A01 (typical: vibration, temperature, spindle current)
INSERT INTO sensors (equipment_id, name, sensor_type, unit, sampling_rate_hz,
min_range, max_range, calibration_date, is_active, tags) VALUES
(1, 'CNC-A01-VIB-X', 'vibration', 'mm/s', 1000, 0, 50,
'2026-01-15', TRUE, '{"axis": "x", "monitoring_group": "critical_24x7"}'),
(1, 'CNC-A01-VIB-Y', 'vibration', 'mm/s', 1000, 0, 50,
'2026-01-15', TRUE, '{"axis": "y", "monitoring_group": "critical_24x7"}'),
(1, 'CNC-A01-TEMP-SPINDLE', 'temperature', 'celsius', 1, 10, 85,
'2026-02-01', TRUE, '{"location": "spindle_bearing"}'),
(1, 'CNC-A01-CURRENT', 'current', 'ampere', 10, 0, 30,
'2026-02-01', TRUE, '{"phase": "main_spindle"}');
Data Flow
In this plant, the data flow works as follows:
- Sensors output analog/digital signals to edge PLCs (Programmable Logic Controllers)
- Edge PLCs digitize and publish to an MQTT broker via Sparkplug B protocol
- Telegraf agents (one per building) subscribe to MQTT, buffer locally, and forward to the central database
- TimescaleDB receives inserts via the Telegraf PostgreSQL output plugin
- The ingestion validator (our Python script) runs as a sidecar, monitoring for unknown sensor IDs
At 500 sensors averaging 10 Hz, the system handles approximately 5,000 inserts per second during normal operation, with bursts up to 50,000/s when high-frequency vibration captures are triggered. TimescaleDB on a single node (16 vCPU, 64 GB RAM, NVMe SSD) handles this comfortably with batch inserts.
Dashboard Queries
The operations team uses a Grafana dashboard backed by these queries:
-- Dashboard Panel 1: Plant Overview — current status of all equipment
SELECT
f.name AS building,
e.name AS machine,
e.equipment_type,
e.status AS equipment_status,
COUNT(s.id) FILTER (WHERE s.is_active) AS active_sensors,
COUNT(ae.id) FILTER (WHERE ae.severity IN ('high', 'critical')
AND ae.start_time > NOW() - INTERVAL '24 hours') AS critical_anomalies_24h,
MAX(ml.performed_at) AS last_maintenance
FROM equipment e
JOIN facilities f ON f.id = e.facility_id
LEFT JOIN sensors s ON s.equipment_id = e.id
LEFT JOIN anomaly_events ae ON ae.sensor_id = s.id
LEFT JOIN maintenance_logs ml ON ml.equipment_id = e.id
GROUP BY f.name, e.name, e.equipment_type, e.status
ORDER BY critical_anomalies_24h DESC, f.name, e.name;
-- Dashboard Panel 2: Vibration trends for Line 3 CNC machines (last 24h)
SELECT
time_bucket('15 minutes', r.time) AS period,
e.name AS machine,
AVG(r.value) AS avg_vibration,
MAX(r.value) AS peak_vibration
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
WHERE e.production_line = 'Line 3'
AND e.equipment_type = 'cnc'
AND s.sensor_type = 'vibration'
AND r.time > NOW() - INTERVAL '24 hours'
GROUP BY period, e.name
ORDER BY period, e.name;
-- Dashboard Panel 3: Equipment needing attention
-- (sensors exceeding 80% of their max range)
SELECT
e.name AS machine,
s.name AS sensor,
s.sensor_type,
s.max_range,
latest.last_value,
ROUND((latest.last_value / s.max_range * 100)::numeric, 1) AS pct_of_max
FROM sensors s
JOIN equipment e ON e.id = s.equipment_id
CROSS JOIN LATERAL (
SELECT value AS last_value
FROM sensor_readings
WHERE sensor_id = s.id
ORDER BY time DESC
LIMIT 1
) latest
WHERE s.is_active = TRUE
AND s.max_range IS NOT NULL
AND latest.last_value > s.max_range * 0.8
ORDER BY pct_of_max DESC;
Anomaly Detection Integration
When an ML anomaly detection model flags unusual behavior, it writes to the anomaly_events table with full metadata context. A Python worker might look like this:
async def record_anomaly(
pool: asyncpg.Pool,
sensor_id: int,
anomaly_type: str,
severity: str,
value_at_detection: float,
model_version: str,
):
"""Record an anomaly event with metadata validation."""
async with pool.acquire() as conn:
# Validate sensor exists and get context for logging
sensor = await conn.fetchrow("""
SELECT s.name, s.sensor_type, s.max_range,
e.name AS equipment, f.name AS facility
FROM sensors s
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE s.id = $1
""", sensor_id)
if not sensor:
raise ValueError(f"Sensor {sensor_id} not found in metadata")
anomaly_id = await conn.fetchval("""
INSERT INTO anomaly_events
(sensor_id, start_time, anomaly_type, severity,
value_at_detection, model_version)
VALUES ($1, NOW(), $2, $3, $4, $5)
RETURNING id
""", sensor_id, anomaly_type, severity, value_at_detection, model_version)
logger.warning(
f"Anomaly #{anomaly_id}: {severity} {anomaly_type} on "
f"{sensor['equipment']}/{sensor['name']} ({sensor['facility']}) "
f"value={value_at_detection} (max={sensor['max_range']})"
)
return anomaly_id
Common Pitfalls
After reviewing dozens of sensor data architectures, these are the mistakes I see most often:
| Pitfall | Impact | Solution |
|---|---|---|
| Denormalizing metadata into every time-series row | 10-20x storage bloat, metadata updates require backfilling billions of rows | Store only sensor_id in time-series, JOIN at query time |
| No foreign key validation | Orphaned readings accumulate, 10-20% of data becomes unlinkable | Validate sensor_id at ingestion, run periodic quality checks |
| Single database for everything | Either metadata or time-series queries suffer poor performance | Use TimescaleDB (best of both) or a split architecture |
| Not planning for sensor changes | Historical data misinterpreted after recalibration or replacement | Implement SCD Type 2 for sensor history |
| Ignoring time zones | Time shifts corrupt analysis, especially across multi-site deployments | Always use TIMESTAMPTZ, store in UTC, convert at display time |
| Missing indexes on JOIN columns | Cross-domain queries take minutes instead of milliseconds | Index (sensor_id, time DESC) on time-series, all FKs on metadata |
| No retention policy | Storage costs grow linearly forever, query performance degrades | Tiered retention: raw (30d) → downsampled (1y) → archived (S3) |
| String-based sensor identification | Name changes break links, inconsistent naming across teams | Use integer IDs as primary key, names as human-readable labels |
Conclusion
Managing metadata and time-series data together is not a luxury — it is a fundamental requirement for any system that wants to derive actionable insights from sensor data. The sensor_id is the bridge between what your sensors are (metadata) and what they are measuring (time-series), and your architecture must make it trivially easy to cross that bridge in both directions.
For most teams, PostgreSQL with TimescaleDB is the right starting point. You get native SQL JOINs across metadata and time-series tables, a single connection string, familiar tooling, and excellent performance up to terabyte scale. When you outgrow that, the patterns for InfluxDB integration, Parquet data lakes, and TDengine super tables give you a clear upgrade path.
The key design principles to remember:
- Separate but connected: Metadata in relational tables, time-series in optimized storage, linked by
sensor_id - Sensor registry: Treat sensors as first-class entities with rich metadata (type, unit, range, calibration, sampling rate)
- Slowly changing dimensions: Track metadata changes over time so historical data can be correctly interpreted
- Validate at ingestion: Never insert a time-series reading without confirming the sensor exists in metadata
- Tiered retention: Raw data (30 days) → downsampled (1 year) → aggregated (forever) → archived (cold storage)
- Index the bridge: Composite indexes on
(sensor_id, time DESC)make cross-domain queries fast
The complete schema, ingestion pipeline, query patterns, and API design in this guide give you a production-ready blueprint. Start with the PostgreSQL + TimescaleDB pattern, add the sensor registry and validation layer, implement continuous aggregates for downsampling, and build your API layer with FastAPI. You will have a system where "show me all vibration anomalies from Building A's CNC machines installed after 2023" is a query that returns results in milliseconds, not a question that leaves your team staring at their screens.
References
- TimescaleDB Documentation — Official docs for hypertables, continuous aggregates, compression, and retention policies
- PostgreSQL ltree Extension — Hierarchical tree-like data type for modeling facility structures
- InfluxDB Documentation — Time-series database documentation including Flux query language
- TDengine Super Table Concepts — Understanding super tables, sub-tables, and tags
- Apache Parquet Format — Columnar storage format specification for data lake architectures
- DuckDB Documentation — In-process analytical database for querying Parquet files
- FastAPI Documentation — Modern Python web framework used in the API design examples
- SQLAlchemy Documentation — Python ORM for metadata table management
- Telegraf Plugin Documentation — Agent for collecting and writing metrics from MQTT, Modbus, and other sources
- MQTT Specification — Lightweight messaging protocol widely used in IoT sensor networks
Leave a Reply