Home Programming Managing Metadata and Time-Series Data Together: A Practical Guide for Facility and Sensor Signal Systems

Managing Metadata and Time-Series Data Together: A Practical Guide for Facility and Sensor Signal Systems

Introduction

A factory floor with 500 sensors is generating 2.6 billion data points per year. Every vibration reading, every temperature spike, every pressure anomaly is faithfully captured and stored. But when an engineer asks a straightforward question — “Show me all vibration anomalies from Building A’s CNC machines installed after 2023” — the team stares blankly at their screens. The data is there, scattered across three different systems, but nobody can answer that question in under ten minutes.

This scenario plays out in manufacturing plants, energy grids, building management systems, and IoT deployments worldwide. The root cause is always the same: the team treated metadata and time-series data as separate problems, and never designed the bridge between them.

In any industrial, manufacturing, or IoT system, you are dealing with two fundamentally different types of data that must work in concert. First, there is metadata — information about facilities, equipment, sensors, locations, configurations, maintenance history, and calibration records. This data is relational, hierarchical, and changes slowly. Second, there is time-series data — the actual sensor signals (temperature, vibration, pressure, torque, current, flow rate) streaming in at high frequency, sometimes thousands of readings per second. This data is append-only, voluminous, and indexed by time.

The relationship between these two data types is what makes everything work. A sensor reading of “47.3” means nothing without knowing that sensor S-0142 is a thermocouple mounted on a FANUC CNC spindle in Building A, calibrated last month, with an operating range of 15–85°C. The sensor_id is the bridge — metadata tells you what, time-series tells you when and how much.

Most teams get this relationship wrong. They embed metadata in every time-series row (creating massive bloat), or they completely separate the two without proper foreign keys (creating orphaned data), or they force everything into a single database that performs poorly on at least one workload. The result is the same: queries that should take milliseconds take minutes, data that should be connected is isolated, and engineers who should be finding anomalies are instead fighting with data infrastructure.

This guide is the definitive reference for designing a system that manages metadata and time-series data together correctly. We will walk through four architecture patterns, complete SQL schemas, Python code with SQLAlchemy and FastAPI, ingestion pipelines, query optimization strategies, and a real-world manufacturing example. By the end, you will have everything you need to build a system where that “CNC vibration anomalies in Building A” query returns results in under a second.

The Data Model Challenge

Before diving into solutions, let us clearly understand why these two data types are so difficult to manage together. They have fundamentally different characteristics, and a database architecture that is optimal for one is almost always suboptimal for the other.

Metadata: Relational, Hierarchical, Slowly Changing

Facility and sensor metadata follows a natural hierarchy. A typical industrial deployment looks like this:

Organization → Site → Building → Production Line → Machine → Component → Sensor

Each level in this hierarchy carries rich attributes. A sensor record might include: sensor type, unit of measurement, sampling rate in Hz, minimum and maximum operating range, calibration date, firmware version, installation date, and the equipment it is mounted on. A machine record includes manufacturer, model, serial number, commissioning date, maintenance schedule, and operating parameters.

This data is relational — sensors belong to equipment, equipment belongs to production lines, production lines belong to buildings. It is hierarchical — you often need to query “all sensors in Building A” which means traversing the tree. It is slowly changing — a sensor gets recalibrated, a machine gets moved to a different production line, firmware gets updated. And it is schema-rich — each entity type has many attributes with different data types, constraints, and relationships.

Time-Series: Append-Only, High Volume, Time-Indexed

Sensor readings are the opposite in nearly every way. A typical reading is just three fields: timestamp, sensor_id, and value. Maybe a few additional channels for multi-axis sensors (x, y, z for accelerometers). The schema is narrow and rarely changes.

But the volume is enormous. A single vibration sensor sampling at 1 kHz generates 86.4 million readings per day. Even at a modest 1 Hz sampling rate, 500 sensors produce 43.2 million readings per day — roughly 15.8 billion per year. This data is append-only (you almost never update a historical reading), time-indexed (every query includes a time range), and write-heavy (ingestion throughput is critical).

Characteristics Comparison

Characteristic Metadata Time-Series
Schema Wide, complex, many tables Narrow (timestamp, id, value)
Volume Thousands to millions of rows Billions to trillions of rows
Write pattern Infrequent updates, inserts Continuous high-throughput appends
Read pattern Lookups, JOINs, tree traversal Range scans by time, aggregations
Relationships Rich foreign keys, hierarchies Single FK (sensor_id)
Mutability Updates and deletes common Append-only, rarely modified
Indexing B-tree, GIN, full-text Time-partitioned, BRIN
Retention Keep forever Tiered (raw → downsampled → archived)

 

Common Mistakes

Teams typically fall into one of three traps:

Mistake 1: Embedding metadata in every time-series row. Instead of storing (timestamp, sensor_id, value), they store (timestamp, sensor_id, value, building_name, machine_name, manufacturer, sensor_type, unit, ...). A row that should be 24 bytes becomes 500 bytes. With billions of rows, this means terabytes of redundant data, slower queries, and a nightmare when metadata changes (do you backfill every historical row?).

Mistake 2: Complete separation without proper linking. Metadata lives in PostgreSQL, time-series lives in InfluxDB, and the only link is a sensor name string that someone typed manually. Sensor names get changed, new sensors get added to the time-series database without being registered in the metadata database, and suddenly 15% of your readings are orphaned — you have data from sensors that do not exist in your metadata system.

Mistake 3: Using one database for everything. Forcing all data into PostgreSQL means time-series queries are slow (no time-partitioning, no columnar compression). Forcing everything into InfluxDB means metadata queries are impossible (no JOINs, no foreign keys, no transactions). Neither database excels at the other’s workload.

Key Takeaway: The sensor_id is the bridge between metadata and time-series. Your architecture must make it easy to start from either side — filter by metadata attributes and then fetch time-series, or detect time-series anomalies and then look up metadata context.

Architecture Patterns

There is no single “right” architecture for combining metadata and time-series data. The best choice depends on your scale, team expertise, existing infrastructure, and query patterns. Here are four proven patterns, from the most commonly recommended to the most specialized.

Pattern 1: PostgreSQL + TimescaleDB (Recommended)

This is the pattern I recommend for most teams, and the one we will spend the most time on. TimescaleDB is a PostgreSQL extension that adds time-series capabilities — hypertables, automatic partitioning by time, continuous aggregates, and compression — while keeping full PostgreSQL functionality. Because it runs inside PostgreSQL, you get native SQL JOINs between your metadata tables and your time-series hypertables.

Here is the complete schema:

-- Enable TimescaleDB
CREATE EXTENSION IF NOT EXISTS timescaledb;

-- ============================================
-- METADATA TABLES
-- ============================================

CREATE TABLE facilities (
    id          SERIAL PRIMARY KEY,
    name        VARCHAR(200) NOT NULL,
    location    VARCHAR(500),
    facility_type VARCHAR(50) NOT NULL,  -- 'manufacturing', 'warehouse', 'office'
    commissioned_date DATE,
    status      VARCHAR(20) DEFAULT 'active',
    metadata    JSONB DEFAULT '{}',
    created_at  TIMESTAMPTZ DEFAULT NOW(),
    updated_at  TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE equipment (
    id              SERIAL PRIMARY KEY,
    facility_id     INTEGER NOT NULL REFERENCES facilities(id),
    name            VARCHAR(200) NOT NULL,
    equipment_type  VARCHAR(50) NOT NULL,  -- 'cnc', 'robot', 'conveyor', 'pump'
    manufacturer    VARCHAR(200),
    model           VARCHAR(200),
    serial_number   VARCHAR(100) UNIQUE,
    install_date    DATE,
    production_line VARCHAR(100),
    status          VARCHAR(20) DEFAULT 'operational',
    operating_params JSONB DEFAULT '{}',
    created_at      TIMESTAMPTZ DEFAULT NOW(),
    updated_at      TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_equipment_facility ON equipment(facility_id);
CREATE INDEX idx_equipment_type ON equipment(equipment_type);
CREATE INDEX idx_equipment_manufacturer ON equipment(manufacturer);
CREATE INDEX idx_equipment_line ON equipment(production_line);

CREATE TABLE sensors (
    id                SERIAL PRIMARY KEY,
    equipment_id      INTEGER NOT NULL REFERENCES equipment(id),
    name              VARCHAR(200) NOT NULL,
    sensor_type       VARCHAR(50) NOT NULL,   -- 'temperature', 'vibration', 'pressure'
    unit              VARCHAR(20) NOT NULL,    -- 'celsius', 'mm/s', 'bar', 'A'
    sampling_rate_hz  REAL DEFAULT 1.0,
    min_range         REAL,
    max_range         REAL,
    calibration_date  DATE,
    firmware_version  VARCHAR(50),
    is_active         BOOLEAN DEFAULT TRUE,
    tags              JSONB DEFAULT '{}',
    created_at        TIMESTAMPTZ DEFAULT NOW(),
    updated_at        TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_sensors_equipment ON sensors(equipment_id);
CREATE INDEX idx_sensors_type ON sensors(sensor_type);
CREATE INDEX idx_sensors_active ON sensors(is_active) WHERE is_active = TRUE;
CREATE INDEX idx_sensors_tags ON sensors USING GIN(tags);

CREATE TABLE maintenance_logs (
    id              SERIAL PRIMARY KEY,
    equipment_id    INTEGER NOT NULL REFERENCES equipment(id),
    maintenance_type VARCHAR(50) NOT NULL,  -- 'preventive', 'corrective', 'calibration'
    description     TEXT,
    performed_at    TIMESTAMPTZ NOT NULL,
    completed_at    TIMESTAMPTZ,
    technician      VARCHAR(200),
    parts_replaced  JSONB DEFAULT '[]',
    created_at      TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_maintenance_equipment ON maintenance_logs(equipment_id);
CREATE INDEX idx_maintenance_time ON maintenance_logs(performed_at);

-- ============================================
-- TIME-SERIES TABLES (TimescaleDB Hypertables)
-- ============================================

CREATE TABLE sensor_readings (
    time        TIMESTAMPTZ NOT NULL,
    sensor_id   INTEGER NOT NULL REFERENCES sensors(id),
    value       DOUBLE PRECISION NOT NULL
);

SELECT create_hypertable('sensor_readings', 'time');

CREATE INDEX idx_readings_sensor_time ON sensor_readings (sensor_id, time DESC);

-- Enable compression (after 7 days)
ALTER TABLE sensor_readings SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'sensor_id',
    timescaledb.compress_orderby = 'time DESC'
);

SELECT add_compression_policy('sensor_readings', INTERVAL '7 days');

-- Anomaly events table
CREATE TABLE anomaly_events (
    id              SERIAL PRIMARY KEY,
    sensor_id       INTEGER NOT NULL REFERENCES sensors(id),
    start_time      TIMESTAMPTZ NOT NULL,
    end_time        TIMESTAMPTZ,
    anomaly_type    VARCHAR(50) NOT NULL,  -- 'threshold', 'trend', 'pattern'
    severity        VARCHAR(20) NOT NULL,  -- 'low', 'medium', 'high', 'critical'
    value_at_detection DOUBLE PRECISION,
    model_version   VARCHAR(50),
    notes           TEXT,
    acknowledged    BOOLEAN DEFAULT FALSE,
    created_at      TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_anomaly_sensor ON anomaly_events(sensor_id);
CREATE INDEX idx_anomaly_time ON anomaly_events(start_time);
Tip: The compress_segmentby = 'sensor_id' setting is critical. It tells TimescaleDB to group compressed data by sensor, which means queries filtered by sensor_id only decompress the relevant segments. Without this, every query would decompress entire chunks.

Now let us see the power of native JOINs. Here are queries that cross the metadata/time-series boundary effortlessly:

-- Query 1: Average temperature for all sensors in Building A, last 24 hours
SELECT
    f.name AS facility,
    e.name AS equipment,
    s.name AS sensor,
    AVG(r.value) AS avg_temp,
    MIN(r.value) AS min_temp,
    MAX(r.value) AS max_temp
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE f.name = 'Building A'
  AND s.sensor_type = 'temperature'
  AND r.time > NOW() - INTERVAL '24 hours'
GROUP BY f.name, e.name, s.name
ORDER BY avg_temp DESC;

-- Query 2: FANUC machines with vibration exceeding threshold
SELECT
    e.name AS machine,
    e.model,
    s.name AS sensor,
    s.max_range AS threshold,
    MAX(r.value) AS peak_vibration,
    COUNT(*) AS exceedance_count
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
WHERE e.manufacturer = 'FANUC'
  AND s.sensor_type = 'vibration'
  AND r.value > s.max_range
  AND r.time > NOW() - INTERVAL '7 days'
GROUP BY e.name, e.model, s.name, s.max_range
ORDER BY peak_vibration DESC;

-- Query 3: Compare vibration across CNC machines on Production Line 3
SELECT
    e.name AS machine,
    time_bucket('1 hour', r.time) AS hour,
    AVG(r.value) AS avg_vibration,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY r.value) AS p95_vibration
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
WHERE e.production_line = 'Line 3'
  AND e.equipment_type = 'cnc'
  AND s.sensor_type = 'vibration'
  AND r.time > NOW() - INTERVAL '7 days'
GROUP BY e.name, hour
ORDER BY e.name, hour;

Notice how each query seamlessly combines metadata filters (facility name, manufacturer, production line, sensor type) with time-series operations (time ranges, aggregations, percentiles). This is the primary advantage of the PostgreSQL + TimescaleDB pattern — a single SQL statement can traverse the entire data model.

Pattern 2: PostgreSQL + InfluxDB

When InfluxDB is already part of your stack, or when write throughput exceeds what PostgreSQL can handle (generally above 500K inserts/second on a single node), a split architecture makes sense. Metadata stays in PostgreSQL, time-series goes to InfluxDB, and your application performs the JOIN.

import asyncpg
from influxdb_client import InfluxDBClient
from datetime import datetime, timedelta

class DualDatabaseQuery:
    def __init__(self, pg_dsn: str, influx_url: str, influx_token: str, influx_org: str):
        self.pg_dsn = pg_dsn
        self.influx = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org)
        self.query_api = self.influx.query_api()

    async def get_readings_by_facility(
        self, facility_name: str, sensor_type: str, hours: int = 24
    ):
        # Step 1: Query metadata from PostgreSQL
        conn = await asyncpg.connect(self.pg_dsn)
        sensors = await conn.fetch("""
            SELECT s.id, s.name, e.name AS equipment_name
            FROM sensors s
            JOIN equipment e ON e.id = s.equipment_id
            JOIN facilities f ON f.id = e.facility_id
            WHERE f.name = $1 AND s.sensor_type = $2 AND s.is_active = TRUE
        """, facility_name, sensor_type)
        await conn.close()

        if not sensors:
            return []

        # Step 2: Query time-series from InfluxDB, filtered by sensor IDs
        sensor_ids = [str(s['id']) for s in sensors]
        sensor_filter = ' or '.join(
            f'r["sensor_id"] == "{sid}"' for sid in sensor_ids
        )

        flux_query = f'''
        from(bucket: "sensor_data")
          |> range(start: -{hours}h)
          |> filter(fn: (r) => r["_measurement"] == "readings")
          |> filter(fn: (r) => {sensor_filter})
          |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
        '''
        tables = self.query_api.query(flux_query)

        # Step 3: Merge metadata with time-series results
        sensor_lookup = {str(s['id']): s for s in sensors}
        results = []
        for table in tables:
            for record in table.records:
                sid = record.values.get("sensor_id")
                meta = sensor_lookup.get(sid, {})
                results.append({
                    "time": record.get_time(),
                    "sensor_id": sid,
                    "sensor_name": meta.get("name"),
                    "equipment": meta.get("equipment_name"),
                    "value": record.get_value(),
                })
        return results
Caution: The two-step query pattern (metadata first, then time-series) means your application is responsible for consistency. If a sensor is deleted from PostgreSQL but readings still exist in InfluxDB, you get orphaned data. Always validate sensor_id existence before writing to InfluxDB.

The PostgreSQL + InfluxDB pattern works, but you lose the elegance of native JOINs. Every cross-domain query requires two round-trips, and complex queries (like “compare vibration patterns across machines by manufacturer”) require substantial application-level logic. Use this pattern when you already have InfluxDB in production and migration is not feasible, or when your write throughput genuinely exceeds PostgreSQL/TimescaleDB limits.

Pattern 3: PostgreSQL + Parquet/Iceberg on S3

For very large-scale deployments (terabytes of time-series data) or when the primary consumer is batch ML training pipelines, storing time-series data as Parquet files on S3 is cost-effective and scalable. Metadata stays in PostgreSQL, and you join them at query time using DuckDB, Athena, or Spark.

import duckdb
import asyncpg
from pathlib import Path

class ParquetTimeSeriesQuery:
    """
    Time-series stored as Parquet files on S3, partitioned by:
    s3://data-lake/sensor_readings/sensor_id={id}/date={YYYY-MM-DD}/data.parquet
    """

    def __init__(self, pg_dsn: str, s3_base: str):
        self.pg_dsn = pg_dsn
        self.s3_base = s3_base
        self.duck = duckdb.connect()
        self.duck.execute("INSTALL httpfs; LOAD httpfs;")
        self.duck.execute("SET s3_region='us-east-1';")

    async def query_with_metadata(
        self, facility_name: str, sensor_type: str, start_date: str, end_date: str
    ):
        # Step 1: Get relevant sensor IDs from PostgreSQL
        conn = await asyncpg.connect(self.pg_dsn)
        sensors = await conn.fetch("""
            SELECT s.id, s.name, s.unit, e.name AS equipment,
                   e.manufacturer, f.name AS facility
            FROM sensors s
            JOIN equipment e ON e.id = s.equipment_id
            JOIN facilities f ON f.id = e.facility_id
            WHERE f.name = $1 AND s.sensor_type = $2
        """, facility_name, sensor_type)
        await conn.close()

        # Step 2: Build Parquet glob paths for relevant sensors
        sensor_ids = [s['id'] for s in sensors]
        paths = [
            f"{self.s3_base}/sensor_id={sid}/date=*/data.parquet"
            for sid in sensor_ids
        ]

        # Step 3: Query with DuckDB
        result = self.duck.execute(f"""
            SELECT
                sensor_id,
                date_trunc('hour', time) AS hour,
                AVG(value) AS avg_value,
                MAX(value) AS max_value,
                COUNT(*) AS reading_count
            FROM parquet_scan({paths})
            WHERE time BETWEEN '{start_date}' AND '{end_date}'
            GROUP BY sensor_id, hour
            ORDER BY sensor_id, hour
        """).fetchdf()

        # Step 4: Merge with metadata
        sensor_lookup = {s['id']: dict(s) for s in sensors}
        result['equipment'] = result['sensor_id'].map(
            lambda sid: sensor_lookup.get(sid, {}).get('equipment')
        )
        result['facility'] = result['sensor_id'].map(
            lambda sid: sensor_lookup.get(sid, {}).get('facility')
        )
        return result

This pattern is best for data lakes and ML training pipelines where you need to process large volumes of historical data cost-effectively. Parquet’s columnar format provides excellent compression (10-20x vs CSV), and partitioning by sensor_id and date means queries only read relevant files. However, it is poorly suited for real-time queries or dashboards that need sub-second response times.

Pattern 4: TDengine Super Tables

TDengine takes a radically different approach. Its “super table” concept embeds metadata as tags directly alongside time-series data. Each physical sensor gets a sub-table that inherits from a super table, and tags (metadata) are stored only once per sub-table, not repeated in every row.

-- Create a super table with tags (metadata) and columns (time-series)
CREATE STABLE sensor_readings (
    ts          TIMESTAMP,
    value       DOUBLE,
    quality     INT
) TAGS (
    facility    NCHAR(200),
    building    NCHAR(100),
    equipment   NCHAR(200),
    manufacturer NCHAR(200),
    sensor_type NCHAR(50),
    unit        NCHAR(20),
    line        NCHAR(100)
);

-- Create sub-tables for each sensor (tags are set once)
CREATE TABLE sensor_0001 USING sensor_readings TAGS (
    'Plant Chicago', 'Building A', 'CNC-001', 'FANUC', 'vibration', 'mm/s', 'Line 3'
);

CREATE TABLE sensor_0002 USING sensor_readings TAGS (
    'Plant Chicago', 'Building A', 'CNC-001', 'FANUC', 'temperature', 'celsius', 'Line 3'
);

-- Insert data (just timestamp + values, no metadata repetition)
INSERT INTO sensor_0001 VALUES (NOW(), 4.52, 100);
INSERT INTO sensor_0002 VALUES (NOW(), 67.3, 100);

-- Query across all sensors using metadata tags
SELECT
    facility,
    equipment,
    AVG(value) AS avg_vibration
FROM sensor_readings
WHERE sensor_type = 'vibration'
  AND facility = 'Plant Chicago'
  AND ts > NOW() - 24h
GROUP BY facility, equipment;

TDengine’s approach is elegant for IoT: the metadata is right there with the data, tags are indexed automatically, and you do not need a separate metadata database. The downside is that complex metadata relationships (maintenance logs, calibration history, hierarchical queries) are difficult to model with flat tags. If your metadata is simple and relatively static, TDengine is worth considering. If you need rich relational metadata, stick with Pattern 1 or 2.

Pattern Comparison

Criteria PG + TimescaleDB PG + InfluxDB PG + Parquet/S3 TDengine
Complexity Low Medium Medium-High Low
Native JOINs Yes No (app-level) No (query engine) Tags only
Write throughput 100K-500K rows/s 1M+ rows/s Batch (unlimited) 1M+ rows/s
Query flexibility Full SQL Flux + SQL SQL (DuckDB/Athena) SQL subset
Metadata richness Full relational Full relational Full relational Flat tags only
Scalability TB scale TB scale PB scale TB scale
Best for Most teams Existing InfluxDB Data lakes, ML Simple IoT

 

Detailed Schema Design Best Practices

Regardless of which architecture pattern you choose, certain schema design principles apply universally. Let us walk through the most important ones.

Hierarchical Facility Modeling

Facility hierarchies are inherently tree-structured. You need to efficiently answer queries like “give me all sensors in Building A” which means finding every equipment in every production line in that building. There are two good approaches in PostgreSQL.

Approach 1: The ltree extension

CREATE EXTENSION IF NOT EXISTS ltree;

-- Add a path column to each entity
ALTER TABLE facilities ADD COLUMN path ltree;
ALTER TABLE equipment ADD COLUMN path ltree;
ALTER TABLE sensors ADD COLUMN path ltree;

-- Example paths
-- Facility: 'org.chicago'
-- Equipment: 'org.chicago.building_a.line_3.cnc_001'
-- Sensor: 'org.chicago.building_a.line_3.cnc_001.vibration_x'

CREATE INDEX idx_facility_path ON facilities USING GIST(path);
CREATE INDEX idx_equipment_path ON equipment USING GIST(path);
CREATE INDEX idx_sensor_path ON sensors USING GIST(path);

-- Find all sensors under Building A (any depth)
SELECT s.* FROM sensors s
WHERE s.path <@ 'org.chicago.building_a';

-- Find all equipment exactly 2 levels below org.chicago
SELECT e.* FROM equipment e
WHERE e.path ~ 'org.chicago.*{2}';

Approach 2: Recursive CTEs with adjacency list

If you prefer not to use extensions, recursive CTEs work well for moderate-sized hierarchies:

-- Find all equipment under a specific facility, including nested structures
WITH RECURSIVE facility_tree AS (
    -- Base case: the target facility
    SELECT id, name, facility_type, id AS root_id
    FROM facilities
    WHERE name = 'Building A'

    UNION ALL

    -- Recursive case: equipment belonging to facilities in the tree
    SELECT e.id, e.name, e.equipment_type, ft.root_id
    FROM equipment e
    JOIN facility_tree ft ON e.facility_id = ft.id
)
SELECT * FROM facility_tree;

Slowly Changing Dimensions (SCD Type 2)

Equipment moves between production lines. Sensors get recalibrated. Firmware gets updated. If you simply overwrite the old value, you lose the ability to correctly interpret historical data. A vibration reading from last month should be evaluated against the calibration that was active at that time, not today's calibration.

SCD Type 2 solves this by keeping a history of changes with effective date ranges:

CREATE TABLE sensor_history (
    id              SERIAL PRIMARY KEY,
    sensor_id       INTEGER NOT NULL REFERENCES sensors(id),
    equipment_id    INTEGER NOT NULL REFERENCES equipment(id),
    calibration_date DATE,
    min_range       REAL,
    max_range       REAL,
    firmware_version VARCHAR(50),
    effective_from  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    effective_to    TIMESTAMPTZ,  -- NULL means "current"
    is_current      BOOLEAN DEFAULT TRUE
);

CREATE INDEX idx_sensor_history_current
    ON sensor_history(sensor_id) WHERE is_current = TRUE;

CREATE INDEX idx_sensor_history_range
    ON sensor_history(sensor_id, effective_from, effective_to);

-- When recalibrating a sensor:
-- Step 1: Close the current record
UPDATE sensor_history
SET effective_to = NOW(), is_current = FALSE
WHERE sensor_id = 42 AND is_current = TRUE;

-- Step 2: Insert new record
INSERT INTO sensor_history
    (sensor_id, equipment_id, calibration_date, min_range, max_range,
     firmware_version, effective_from, is_current)
VALUES
    (42, 15, '2026-04-01', 0, 100, 'v3.2.1', NOW(), TRUE);

-- Query: What was the calibration when this anomaly was detected?
SELECT sh.*
FROM sensor_history sh
JOIN anomaly_events ae ON ae.sensor_id = sh.sensor_id
WHERE ae.id = 789
  AND ae.start_time BETWEEN sh.effective_from
      AND COALESCE(sh.effective_to, '9999-12-31'::timestamptz);

JSONB for Flexible Attributes

Not every piece of equipment has the same attributes. A CNC machine has spindle speed and tool count, a conveyor has belt speed and length, a robot has axis count and payload capacity. Rather than creating separate tables for each equipment type, use JSONB columns for type-specific attributes:

-- Equipment with flexible operating parameters
INSERT INTO equipment (facility_id, name, equipment_type, manufacturer,
                       model, operating_params)
VALUES
(1, 'CNC-001', 'cnc', 'FANUC', 'Robodrill a-D21MiB5', '{
    "max_spindle_rpm": 24000,
    "tool_capacity": 21,
    "axes": 5,
    "max_feed_rate_mm_min": 54000
}'::jsonb),
(1, 'Robot-001', 'robot', 'ABB', 'IRB 6700', '{
    "axes": 6,
    "payload_kg": 150,
    "reach_mm": 2650,
    "repeatability_mm": 0.05
}'::jsonb);

-- Query: Find all robots with payload > 100kg
SELECT name, model, operating_params->>'payload_kg' AS payload
FROM equipment
WHERE equipment_type = 'robot'
  AND (operating_params->>'payload_kg')::numeric > 100;

-- Index for fast JSONB queries
CREATE INDEX idx_equipment_params ON equipment USING GIN(operating_params);

Tagging System for Ad-Hoc Grouping

Beyond the formal hierarchy, teams often need to group sensors by arbitrary criteria: "all sensors involved in the Q1 reliability study," "sensors monitored by the ML anomaly detection model," or "critical sensors requiring 24/7 alerting." A flexible tagging system supports this:

-- Sensors table already has a JSONB 'tags' column
-- Usage examples:
UPDATE sensors SET tags = '{
    "monitoring_group": "critical_24x7",
    "ml_model": "vibration_anomaly_v2",
    "study": "q1_reliability",
    "zone": "high_temperature"
}'::jsonb
WHERE id = 42;

-- Find all sensors in a monitoring group
SELECT s.*, e.name AS equipment
FROM sensors s
JOIN equipment e ON e.id = s.equipment_id
WHERE s.tags @> '{"monitoring_group": "critical_24x7"}';

-- Find sensors enrolled in a specific ML model
SELECT s.id, s.name, s.sensor_type
FROM sensors s
WHERE s.tags @> '{"ml_model": "vibration_anomaly_v2"}';

Data Ingestion Pipeline

Getting data from sensors into your database reliably is half the battle. A production ingestion pipeline typically follows this path:

Sensors → MQTT/Modbus → Kafka/MQTT Broker → Telegraf or Custom Consumer → Database

Telegraf Configuration

Telegraf is a popular agent for collecting and forwarding sensor data. Here is a configuration that reads from MQTT, enriches with metadata tags, and writes to TimescaleDB:

# telegraf.conf
[[inputs.mqtt_consumer]]
  servers = ["tcp://mqtt-broker:1883"]
  topics = ["sensors/+/readings"]
  data_format = "json"
  tag_keys = ["sensor_id"]
  json_time_key = "timestamp"
  json_time_format = "2006-01-02T15:04:05Z07:00"

# Enrich with metadata from a lookup file (updated periodically)
[[processors.enum]]
  [[processors.enum.mapping]]
    tag = "sensor_id"
    dest = "sensor_type"
    [processors.enum.mapping.value_mappings]
      "S-0001" = "vibration"
      "S-0002" = "temperature"

[[outputs.postgresql]]
  connection = "postgres://user:pass@localhost/sensordb"
  table_template = """
    INSERT INTO sensor_readings (time, sensor_id, value)
    VALUES ({time}, {sensor_id}::integer, {value})
  """

Python Ingestion Script with Validation

For more control, a custom Python ingestion script can validate sensor IDs against metadata, handle errors, and batch inserts:

import asyncio
import json
import logging
from datetime import datetime, timezone
from typing import Optional

import asyncpg
import aiomqtt

logger = logging.getLogger(__name__)


class SensorDataIngester:
    """Ingests sensor readings with metadata validation."""

    def __init__(self, pg_dsn: str, mqtt_host: str, mqtt_port: int = 1883):
        self.pg_dsn = pg_dsn
        self.mqtt_host = mqtt_host
        self.mqtt_port = mqtt_port
        self.pool: Optional[asyncpg.Pool] = None
        self.valid_sensors: set[int] = set()
        self.batch: list[tuple] = []
        self.batch_size = 1000
        self.flush_interval = 5  # seconds

    async def start(self):
        """Initialize connections and start ingestion."""
        self.pool = await asyncpg.create_pool(self.pg_dsn, min_size=2, max_size=10)
        await self._load_valid_sensors()

        # Run batch flusher and MQTT listener concurrently
        await asyncio.gather(
            self._mqtt_listener(),
            self._periodic_flush(),
            self._periodic_sensor_refresh(),
        )

    async def _load_valid_sensors(self):
        """Load active sensor IDs from metadata database."""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(
                "SELECT id FROM sensors WHERE is_active = TRUE"
            )
            self.valid_sensors = {row['id'] for row in rows}
            logger.info(f"Loaded {len(self.valid_sensors)} active sensors")

    async def _periodic_sensor_refresh(self):
        """Refresh valid sensor list every 5 minutes."""
        while True:
            await asyncio.sleep(300)
            await self._load_valid_sensors()

    async def _mqtt_listener(self):
        """Listen for sensor readings on MQTT."""
        async with aiomqtt.Client(self.mqtt_host, self.mqtt_port) as client:
            await client.subscribe("sensors/+/readings")
            async for message in client.messages:
                try:
                    payload = json.loads(message.payload)
                    sensor_id = int(payload['sensor_id'])

                    # Validate against metadata
                    if sensor_id not in self.valid_sensors:
                        logger.warning(
                            f"Rejected reading from unknown sensor {sensor_id}"
                        )
                        continue

                    timestamp = datetime.fromisoformat(payload['timestamp'])
                    if timestamp.tzinfo is None:
                        timestamp = timestamp.replace(tzinfo=timezone.utc)

                    value = float(payload['value'])

                    self.batch.append((timestamp, sensor_id, value))

                    if len(self.batch) >= self.batch_size:
                        await self._flush_batch()

                except (json.JSONDecodeError, KeyError, ValueError) as e:
                    logger.error(f"Invalid message: {e}")

    async def _periodic_flush(self):
        """Flush batch at regular intervals."""
        while True:
            await asyncio.sleep(self.flush_interval)
            if self.batch:
                await self._flush_batch()

    async def _flush_batch(self):
        """Insert batch of readings into TimescaleDB."""
        if not self.batch:
            return

        batch_to_insert = self.batch.copy()
        self.batch.clear()

        try:
            async with self.pool.acquire() as conn:
                await conn.executemany(
                    """INSERT INTO sensor_readings (time, sensor_id, value)
                       VALUES ($1, $2, $3)""",
                    batch_to_insert
                )
                logger.info(f"Inserted {len(batch_to_insert)} readings")
        except Exception as e:
            logger.error(f"Batch insert failed: {e}")
            # Re-add failed batch for retry
            self.batch.extend(batch_to_insert)


# Data quality checks
async def check_data_quality(pool: asyncpg.Pool):
    """Detect common data quality issues."""
    async with pool.acquire() as conn:
        # Orphaned readings (sensor_id not in sensors table)
        orphaned = await conn.fetchval("""
            SELECT COUNT(DISTINCT r.sensor_id)
            FROM sensor_readings r
            LEFT JOIN sensors s ON s.id = r.sensor_id
            WHERE s.id IS NULL
              AND r.time > NOW() - INTERVAL '24 hours'
        """)

        # Sensors with no recent readings (possible failure)
        silent = await conn.fetch("""
            SELECT s.id, s.name, e.name AS equipment,
                   MAX(r.time) AS last_reading
            FROM sensors s
            JOIN equipment e ON e.id = s.equipment_id
            LEFT JOIN sensor_readings r ON r.sensor_id = s.id
                AND r.time > NOW() - INTERVAL '24 hours'
            WHERE s.is_active = TRUE
            GROUP BY s.id, s.name, e.name
            HAVING MAX(r.time) IS NULL
               OR MAX(r.time) < NOW() - INTERVAL '1 hour'
        """)

        # Sensors with values outside their calibrated range
        out_of_range = await conn.fetch("""
            SELECT s.id, s.name, s.min_range, s.max_range,
                   MIN(r.value) AS min_val, MAX(r.value) AS max_val,
                   COUNT(*) AS violation_count
            FROM sensor_readings r
            JOIN sensors s ON s.id = r.sensor_id
            WHERE r.time > NOW() - INTERVAL '24 hours'
              AND (r.value < s.min_range OR r.value > s.max_range)
            GROUP BY s.id, s.name, s.min_range, s.max_range
        """)

        return {
            "orphaned_sensor_ids": orphaned,
            "silent_sensors": [dict(r) for r in silent],
            "out_of_range_sensors": [dict(r) for r in out_of_range],
        }
Tip: The _load_valid_sensors() method caches active sensor IDs in memory and refreshes every 5 minutes. This prevents a database round-trip for every incoming message while catching new sensor registrations within a reasonable window.

Handling Late-Arriving and Out-of-Order Data

In real-world deployments, data does not always arrive in order. Network delays, edge device buffering, and batch uploads from remote sites all produce out-of-order events. TimescaleDB handles this gracefully — inserts are not required to be in time order. However, if you are using continuous aggregates or materialized views, you need to configure a refresh policy that covers the maximum expected delay:

-- Continuous aggregate that tolerates late data (up to 1 hour)
CREATE MATERIALIZED VIEW hourly_averages
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', time) AS bucket,
    sensor_id,
    AVG(value) AS avg_value,
    MIN(value) AS min_value,
    MAX(value) AS max_value,
    COUNT(*) AS sample_count
FROM sensor_readings
GROUP BY bucket, sensor_id
WITH NO DATA;

-- Refresh policy: refresh the last 2 hours every 30 minutes
SELECT add_continuous_aggregate_policy('hourly_averages',
    start_offset => INTERVAL '2 hours',
    end_offset => INTERVAL '30 minutes',
    schedule_interval => INTERVAL '30 minutes'
);

Querying Across Metadata and Time-Series

The true value of a well-designed schema emerges when you start writing queries that cross the metadata/time-series boundary. Here are five common query patterns with complete SQL and Python implementations.

All Readings by Location and Sensor Type

-- All vibration readings from sensors in Building A, last 7 days
-- Using TimescaleDB time_bucket for efficient aggregation
SELECT
    time_bucket('15 minutes', r.time) AS period,
    e.name AS equipment,
    s.name AS sensor,
    AVG(r.value) AS avg_vibration,
    MAX(r.value) AS peak_vibration,
    PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY r.value) AS p99_vibration
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE f.name = 'Building A'
  AND s.sensor_type = 'vibration'
  AND r.time > NOW() - INTERVAL '7 days'
GROUP BY period, e.name, s.name
ORDER BY period DESC, peak_vibration DESC;

Average Daily Values Grouped by Manufacturer

-- Average daily temperature per facility, grouped by equipment manufacturer
SELECT
    f.name AS facility,
    e.manufacturer,
    time_bucket('1 day', r.time) AS day,
    AVG(r.value) AS avg_temperature,
    COUNT(DISTINCT s.id) AS sensor_count
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE s.sensor_type = 'temperature'
  AND r.time > NOW() - INTERVAL '30 days'
GROUP BY f.name, e.manufacturer, day
ORDER BY f.name, e.manufacturer, day;

Equipment with Sensors Exceeding Their Range

-- Find equipment where any sensor exceeded its max_range in the past month
SELECT
    f.name AS facility,
    e.name AS equipment,
    e.manufacturer,
    s.name AS sensor,
    s.sensor_type,
    s.max_range AS threshold,
    MAX(r.value) AS peak_value,
    COUNT(*) FILTER (WHERE r.value > s.max_range) AS exceedance_count,
    MIN(r.time) FILTER (WHERE r.value > s.max_range) AS first_exceedance,
    MAX(r.time) FILTER (WHERE r.value > s.max_range) AS last_exceedance
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE r.time > NOW() - INTERVAL '30 days'
  AND s.max_range IS NOT NULL
GROUP BY f.name, e.name, e.manufacturer, s.name, s.sensor_type, s.max_range
HAVING COUNT(*) FILTER (WHERE r.value > s.max_range) > 0
ORDER BY exceedance_count DESC;

Readings Before and After Maintenance

-- Compare sensor readings 24 hours before and after a maintenance event
WITH maintenance AS (
    SELECT id, equipment_id, performed_at, maintenance_type
    FROM maintenance_logs
    WHERE id = 456  -- specific maintenance event
),
before_maintenance AS (
    SELECT
        s.name AS sensor,
        s.sensor_type,
        AVG(r.value) AS avg_value,
        STDDEV(r.value) AS stddev_value,
        'before' AS period
    FROM sensor_readings r
    JOIN sensors s ON s.id = r.sensor_id
    JOIN maintenance m ON s.equipment_id = m.equipment_id
    WHERE r.time BETWEEN m.performed_at - INTERVAL '24 hours' AND m.performed_at
    GROUP BY s.name, s.sensor_type
),
after_maintenance AS (
    SELECT
        s.name AS sensor,
        s.sensor_type,
        AVG(r.value) AS avg_value,
        STDDEV(r.value) AS stddev_value,
        'after' AS period
    FROM sensor_readings r
    JOIN sensors s ON s.id = r.sensor_id
    JOIN maintenance m ON s.equipment_id = m.equipment_id
    WHERE r.time BETWEEN m.performed_at AND m.performed_at + INTERVAL '24 hours'
    GROUP BY s.name, s.sensor_type
)
SELECT
    b.sensor,
    b.sensor_type,
    b.avg_value AS avg_before,
    a.avg_value AS avg_after,
    ROUND(((a.avg_value - b.avg_value) / NULLIF(b.avg_value, 0) * 100)::numeric, 2)
        AS pct_change,
    b.stddev_value AS stddev_before,
    a.stddev_value AS stddev_after
FROM before_maintenance b
JOIN after_maintenance a ON a.sensor = b.sensor
ORDER BY ABS((a.avg_value - b.avg_value) / NULLIF(b.avg_value, 0)) DESC;

Anomaly Events with Full Context

-- Anomaly events for FANUC robots installed in 2024, with full context
SELECT
    ae.id AS anomaly_id,
    ae.anomaly_type,
    ae.severity,
    ae.start_time,
    ae.end_time,
    ae.value_at_detection,
    s.name AS sensor,
    s.sensor_type,
    s.max_range,
    e.name AS equipment,
    e.manufacturer,
    e.model,
    e.install_date,
    f.name AS facility
FROM anomaly_events ae
JOIN sensors s ON s.id = ae.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE e.manufacturer = 'FANUC'
  AND e.equipment_type = 'robot'
  AND e.install_date >= '2024-01-01'
  AND ae.start_time > NOW() - INTERVAL '90 days'
ORDER BY ae.severity DESC, ae.start_time DESC;

Python Query Service

Wrapping these queries in a service class provides a clean interface for application code:

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional

import asyncpg


@dataclass
class SensorReading:
    time: datetime
    sensor_id: int
    sensor_name: str
    equipment_name: str
    facility_name: str
    sensor_type: str
    value: float
    unit: str


class QueryService:
    """Combines metadata filtering with time-series queries."""

    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool

    async def get_readings(
        self,
        facility: Optional[str] = None,
        equipment_type: Optional[str] = None,
        manufacturer: Optional[str] = None,
        sensor_type: Optional[str] = None,
        production_line: Optional[str] = None,
        tags: Optional[dict] = None,
        start: Optional[datetime] = None,
        end: Optional[datetime] = None,
        bucket_interval: str = '1 hour',
    ) -> list[dict]:
        """
        Flexible query combining metadata filters with time-series aggregation.
        """
        if start is None:
            start = datetime.utcnow() - timedelta(hours=24)
        if end is None:
            end = datetime.utcnow()

        conditions = ["r.time >= $1", "r.time <= $2"]
        params: list = [start, end]
        param_idx = 3

        if facility:
            conditions.append(f"f.name = ${param_idx}")
            params.append(facility)
            param_idx += 1

        if equipment_type:
            conditions.append(f"e.equipment_type = ${param_idx}")
            params.append(equipment_type)
            param_idx += 1

        if manufacturer:
            conditions.append(f"e.manufacturer = ${param_idx}")
            params.append(manufacturer)
            param_idx += 1

        if sensor_type:
            conditions.append(f"s.sensor_type = ${param_idx}")
            params.append(sensor_type)
            param_idx += 1

        if production_line:
            conditions.append(f"e.production_line = ${param_idx}")
            params.append(production_line)
            param_idx += 1

        if tags:
            conditions.append(f"s.tags @> ${param_idx}::jsonb")
            params.append(json.dumps(tags))
            param_idx += 1

        where_clause = " AND ".join(conditions)

        query = f"""
            SELECT
                time_bucket('{bucket_interval}', r.time) AS bucket,
                s.id AS sensor_id,
                s.name AS sensor_name,
                s.sensor_type,
                s.unit,
                e.name AS equipment_name,
                e.manufacturer,
                f.name AS facility_name,
                AVG(r.value) AS avg_value,
                MIN(r.value) AS min_value,
                MAX(r.value) AS max_value,
                COUNT(*) AS sample_count
            FROM sensor_readings r
            JOIN sensors s ON s.id = r.sensor_id
            JOIN equipment e ON e.id = s.equipment_id
            JOIN facilities f ON f.id = e.facility_id
            WHERE {where_clause}
            GROUP BY bucket, s.id, s.name, s.sensor_type, s.unit,
                     e.name, e.manufacturer, f.name
            ORDER BY bucket DESC, sensor_name
        """

        async with self.pool.acquire() as conn:
            rows = await conn.fetch(query, *params)
            return [dict(r) for r in rows]

    async def get_equipment_health(self, equipment_id: int) -> dict:
        """Get comprehensive health status for a piece of equipment."""
        async with self.pool.acquire() as conn:
            # Equipment metadata
            equipment = await conn.fetchrow("""
                SELECT e.*, f.name AS facility_name
                FROM equipment e
                JOIN facilities f ON f.id = e.facility_id
                WHERE e.id = $1
            """, equipment_id)

            # Latest readings from all sensors
            latest_readings = await conn.fetch("""
                SELECT DISTINCT ON (s.id)
                    s.id AS sensor_id, s.name, s.sensor_type, s.unit,
                    s.min_range, s.max_range,
                    r.time AS last_reading_time,
                    r.value AS last_value,
                    CASE
                        WHEN r.value > s.max_range THEN 'exceeded'
                        WHEN r.value < s.min_range THEN 'below_range'
                        ELSE 'normal'
                    END AS range_status
                FROM sensors s
                LEFT JOIN sensor_readings r ON r.sensor_id = s.id
                    AND r.time > NOW() - INTERVAL '1 hour'
                WHERE s.equipment_id = $1 AND s.is_active = TRUE
                ORDER BY s.id, r.time DESC
            """, equipment_id)

            # Recent anomalies
            anomalies = await conn.fetch("""
                SELECT ae.*, s.name AS sensor_name, s.sensor_type
                FROM anomaly_events ae
                JOIN sensors s ON s.id = ae.sensor_id
                WHERE s.equipment_id = $1
                  AND ae.start_time > NOW() - INTERVAL '7 days'
                ORDER BY ae.start_time DESC
                LIMIT 20
            """, equipment_id)

            # Last maintenance
            last_maintenance = await conn.fetchrow("""
                SELECT * FROM maintenance_logs
                WHERE equipment_id = $1
                ORDER BY performed_at DESC LIMIT 1
            """, equipment_id)

            return {
                "equipment": dict(equipment) if equipment else None,
                "sensors": [dict(r) for r in latest_readings],
                "recent_anomalies": [dict(a) for a in anomalies],
                "last_maintenance": dict(last_maintenance) if last_maintenance else None,
                "overall_status": self._calculate_status(latest_readings, anomalies),
            }

    @staticmethod
    def _calculate_status(readings, anomalies) -> str:
        critical_anomalies = [a for a in anomalies if a['severity'] == 'critical']
        exceeded_sensors = [r for r in readings if r['range_status'] == 'exceeded']

        if critical_anomalies or len(exceeded_sensors) > 2:
            return "critical"
        elif exceeded_sensors or any(a['severity'] == 'high' for a in anomalies):
            return "warning"
        return "healthy"

API Design for Metadata + Time-Series

A well-designed API layer makes the combined metadata/time-series system accessible to dashboards, mobile apps, and other services. Here is a FastAPI implementation that exposes the key endpoints:

from datetime import datetime, timedelta
from typing import Optional

import asyncpg
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel

app = FastAPI(title="Sensor Data API")
pool: asyncpg.Pool = None


@app.on_event("startup")
async def startup():
    global pool
    pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/sensordb",
        min_size=5, max_size=20
    )


@app.on_event("shutdown")
async def shutdown():
    await pool.close()


# ---- Pydantic Models ----

class FacilityResponse(BaseModel):
    id: int
    name: str
    location: Optional[str]
    facility_type: str
    status: str
    equipment_count: int


class EquipmentResponse(BaseModel):
    id: int
    name: str
    equipment_type: str
    manufacturer: Optional[str]
    model: Optional[str]
    status: str
    sensor_count: int
    production_line: Optional[str]


class SensorReadingResponse(BaseModel):
    time: datetime
    value: float
    sensor_name: str
    sensor_type: str
    unit: str


class EquipmentHealthResponse(BaseModel):
    equipment_id: int
    equipment_name: str
    facility: str
    status: str
    sensors: list[dict]
    recent_anomalies: list[dict]
    last_maintenance: Optional[dict]


# ---- Endpoints ----

@app.get("/facilities/{facility_id}/equipment",
         response_model=list[EquipmentResponse])
async def list_equipment(facility_id: int):
    """List all equipment in a facility with metadata."""
    async with pool.acquire() as conn:
        rows = await conn.fetch("""
            SELECT e.id, e.name, e.equipment_type, e.manufacturer,
                   e.model, e.status, e.production_line,
                   COUNT(s.id) AS sensor_count
            FROM equipment e
            LEFT JOIN sensors s ON s.equipment_id = e.id AND s.is_active = TRUE
            WHERE e.facility_id = $1
            GROUP BY e.id
            ORDER BY e.production_line, e.name
        """, facility_id)

        if not rows:
            raise HTTPException(404, "Facility not found or has no equipment")
        return [dict(r) for r in rows]


@app.get("/sensors/{sensor_id}/readings",
         response_model=list[SensorReadingResponse])
async def get_sensor_readings(
    sensor_id: int,
    start: datetime = Query(default_factory=lambda: datetime.utcnow() - timedelta(hours=24)),
    end: datetime = Query(default_factory=datetime.utcnow),
    bucket: str = Query(default="15 minutes",
                        description="Aggregation interval, e.g. '5 minutes', '1 hour'"),
):
    """Get time-series readings for a sensor with metadata context."""
    async with pool.acquire() as conn:
        # Verify sensor exists and get metadata
        sensor = await conn.fetchrow("""
            SELECT s.name, s.sensor_type, s.unit
            FROM sensors s WHERE s.id = $1
        """, sensor_id)

        if not sensor:
            raise HTTPException(404, "Sensor not found")

        readings = await conn.fetch(f"""
            SELECT
                time_bucket('{bucket}', r.time) AS time,
                AVG(r.value) AS value
            FROM sensor_readings r
            WHERE r.sensor_id = $1
              AND r.time BETWEEN $2 AND $3
            GROUP BY time_bucket('{bucket}', r.time)
            ORDER BY time DESC
        """, sensor_id, start, end)

        return [
            {
                "time": r["time"],
                "value": round(r["value"], 4),
                "sensor_name": sensor["name"],
                "sensor_type": sensor["sensor_type"],
                "unit": sensor["unit"],
            }
            for r in readings
        ]


@app.get("/equipment/{equipment_id}/health",
         response_model=EquipmentHealthResponse)
async def get_equipment_health(equipment_id: int):
    """
    Combined health view: latest sensor readings + metadata + anomalies.
    Single endpoint that crosses metadata and time-series boundaries.
    """
    query_service = QueryService(pool)
    health = await query_service.get_equipment_health(equipment_id)

    if not health["equipment"]:
        raise HTTPException(404, "Equipment not found")

    return {
        "equipment_id": equipment_id,
        "equipment_name": health["equipment"]["name"],
        "facility": health["equipment"]["facility_name"],
        "status": health["overall_status"],
        "sensors": health["sensors"],
        "recent_anomalies": health["recent_anomalies"],
        "last_maintenance": health["last_maintenance"],
    }


@app.get("/facilities/{facility_id}/sensors/readings")
async def get_facility_readings(
    facility_id: int,
    sensor_type: Optional[str] = None,
    manufacturer: Optional[str] = None,
    production_line: Optional[str] = None,
    start: datetime = Query(
        default_factory=lambda: datetime.utcnow() - timedelta(hours=24)
    ),
    end: datetime = Query(default_factory=datetime.utcnow),
    bucket: str = "1 hour",
):
    """
    Get aggregated readings for all sensors in a facility,
    with optional metadata filters.
    """
    conditions = ["f.id = $1", "r.time >= $2", "r.time <= $3"]
    params = [facility_id, start, end]
    idx = 4

    if sensor_type:
        conditions.append(f"s.sensor_type = ${idx}")
        params.append(sensor_type)
        idx += 1

    if manufacturer:
        conditions.append(f"e.manufacturer = ${idx}")
        params.append(manufacturer)
        idx += 1

    if production_line:
        conditions.append(f"e.production_line = ${idx}")
        params.append(production_line)
        idx += 1

    where = " AND ".join(conditions)

    async with pool.acquire() as conn:
        rows = await conn.fetch(f"""
            SELECT
                time_bucket('{bucket}', r.time) AS time,
                e.name AS equipment,
                e.manufacturer,
                s.name AS sensor,
                s.sensor_type,
                s.unit,
                AVG(r.value) AS avg_value,
                MAX(r.value) AS max_value,
                MIN(r.value) AS min_value
            FROM sensor_readings r
            JOIN sensors s ON s.id = r.sensor_id
            JOIN equipment e ON e.id = s.equipment_id
            JOIN facilities f ON f.id = e.facility_id
            WHERE {where}
            GROUP BY time_bucket('{bucket}', r.time),
                     e.name, e.manufacturer, s.name, s.sensor_type, s.unit
            ORDER BY time DESC
        """, *params)

        return [dict(r) for r in rows]
Key Takeaway: The /equipment/{id}/health endpoint demonstrates the power of combining metadata and time-series in a single API response. A dashboard can render equipment details, live sensor values, anomaly alerts, and maintenance history from a single API call.

Handling Scale

A system with 500 sensors at 1 Hz generates about 43 million readings per day. At 10 Hz, that jumps to 432 million. Within a year, you are looking at 15-150 billion rows. Without a data lifecycle strategy, storage costs will grow linearly forever.

Data Retention Policies

Data Tier Resolution Retention Storage Use Case
Raw Full resolution (1-1000 Hz) 30 days TimescaleDB (compressed) Real-time dashboards, debugging
Downsampled 1-minute or 5-minute averages 1 year TimescaleDB continuous aggregate Trend analysis, weekly reports
Aggregated Hourly or daily summaries Forever PostgreSQL regular table Historical comparisons, audits
Archived Full resolution 7 years Parquet on S3/Glacier Compliance, ML retraining

 

Implementing this with TimescaleDB:

-- Continuous aggregate: 5-minute downsampling (auto-maintained)
CREATE MATERIALIZED VIEW readings_5min
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('5 minutes', time) AS bucket,
    sensor_id,
    AVG(value) AS avg_value,
    MIN(value) AS min_value,
    MAX(value) AS max_value,
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY value) AS median_value,
    COUNT(*) AS sample_count
FROM sensor_readings
GROUP BY bucket, sensor_id
WITH NO DATA;

SELECT add_continuous_aggregate_policy('readings_5min',
    start_offset => INTERVAL '2 hours',
    end_offset => INTERVAL '30 minutes',
    schedule_interval => INTERVAL '30 minutes'
);

-- Continuous aggregate: hourly (built on top of 5-min aggregate)
CREATE MATERIALIZED VIEW readings_hourly
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', bucket) AS bucket,
    sensor_id,
    AVG(avg_value) AS avg_value,
    MIN(min_value) AS min_value,
    MAX(max_value) AS max_value,
    SUM(sample_count) AS sample_count
FROM readings_5min
GROUP BY time_bucket('1 hour', bucket), sensor_id
WITH NO DATA;

SELECT add_continuous_aggregate_policy('readings_hourly',
    start_offset => INTERVAL '4 hours',
    end_offset => INTERVAL '1 hour',
    schedule_interval => INTERVAL '1 hour'
);

-- Drop raw data after 30 days
SELECT add_retention_policy('sensor_readings', INTERVAL '30 days');

-- Keep 5-minute aggregates for 1 year
SELECT add_retention_policy('readings_5min', INTERVAL '1 year');
Caution: Before enabling retention policies, make sure your archival pipeline is working. Once add_retention_policy drops a chunk, the raw data is gone. Export to Parquet on S3 first if you need long-term raw data access for compliance or ML training.

Real-World Example: Manufacturing Plant

Let us walk through a complete real-world scenario to tie everything together. Imagine a manufacturing plant with the following setup:

  • 3 buildings (A, B, C) on a single campus
  • 50 machines: 20 CNC machines (FANUC, DMG Mori), 15 robots (ABB, KUKA), 10 conveyors, 5 pumps
  • 500 sensors: vibration, temperature, pressure, current, torque, flow rate
  • Average sampling rate: 10 Hz (some vibration sensors at 1 kHz for spectral analysis)

The Schema

-- Seed the metadata
INSERT INTO facilities (name, location, facility_type, commissioned_date, status) VALUES
('Building A', 'North Campus, Chicago IL', 'manufacturing', '2019-03-15', 'active'),
('Building B', 'North Campus, Chicago IL', 'manufacturing', '2021-07-01', 'active'),
('Building C', 'North Campus, Chicago IL', 'warehouse', '2022-01-10', 'active');

-- Sample equipment (showing pattern, not all 50)
INSERT INTO equipment (facility_id, name, equipment_type, manufacturer, model,
                       serial_number, install_date, production_line, status,
                       operating_params) VALUES
(1, 'CNC-A01', 'cnc', 'FANUC', 'Robodrill a-D21MiB5', 'FN-2024-0891',
 '2024-03-15', 'Line 1', 'operational',
 '{"max_spindle_rpm": 24000, "tool_capacity": 21, "axes": 5}'),
(1, 'CNC-A02', 'cnc', 'DMG Mori', 'DMU 50', 'DM-2023-4521',
 '2023-09-01', 'Line 1', 'operational',
 '{"max_spindle_rpm": 20000, "tool_capacity": 30, "axes": 5}'),
(1, 'Robot-A01', 'robot', 'ABB', 'IRB 6700', 'ABB-2024-1122',
 '2024-06-10', 'Line 2', 'operational',
 '{"axes": 6, "payload_kg": 150, "reach_mm": 2650}'),
(2, 'CNC-B01', 'cnc', 'FANUC', 'Robodrill a-D21LiB5ADV', 'FN-2024-1205',
 '2024-11-20', 'Line 3', 'operational',
 '{"max_spindle_rpm": 24000, "tool_capacity": 21, "axes": 5}');

-- Sensors for CNC-A01 (typical: vibration, temperature, spindle current)
INSERT INTO sensors (equipment_id, name, sensor_type, unit, sampling_rate_hz,
                     min_range, max_range, calibration_date, is_active, tags) VALUES
(1, 'CNC-A01-VIB-X', 'vibration', 'mm/s', 1000, 0, 50,
 '2026-01-15', TRUE, '{"axis": "x", "monitoring_group": "critical_24x7"}'),
(1, 'CNC-A01-VIB-Y', 'vibration', 'mm/s', 1000, 0, 50,
 '2026-01-15', TRUE, '{"axis": "y", "monitoring_group": "critical_24x7"}'),
(1, 'CNC-A01-TEMP-SPINDLE', 'temperature', 'celsius', 1, 10, 85,
 '2026-02-01', TRUE, '{"location": "spindle_bearing"}'),
(1, 'CNC-A01-CURRENT', 'current', 'ampere', 10, 0, 30,
 '2026-02-01', TRUE, '{"phase": "main_spindle"}');

Data Flow

In this plant, the data flow works as follows:

  1. Sensors output analog/digital signals to edge PLCs (Programmable Logic Controllers)
  2. Edge PLCs digitize and publish to an MQTT broker via Sparkplug B protocol
  3. Telegraf agents (one per building) subscribe to MQTT, buffer locally, and forward to the central database
  4. TimescaleDB receives inserts via the Telegraf PostgreSQL output plugin
  5. The ingestion validator (our Python script) runs as a sidecar, monitoring for unknown sensor IDs

At 500 sensors averaging 10 Hz, the system handles approximately 5,000 inserts per second during normal operation, with bursts up to 50,000/s when high-frequency vibration captures are triggered. TimescaleDB on a single node (16 vCPU, 64 GB RAM, NVMe SSD) handles this comfortably with batch inserts.

Dashboard Queries

The operations team uses a Grafana dashboard backed by these queries:

-- Dashboard Panel 1: Plant Overview — current status of all equipment
SELECT
    f.name AS building,
    e.name AS machine,
    e.equipment_type,
    e.status AS equipment_status,
    COUNT(s.id) FILTER (WHERE s.is_active) AS active_sensors,
    COUNT(ae.id) FILTER (WHERE ae.severity IN ('high', 'critical')
        AND ae.start_time > NOW() - INTERVAL '24 hours') AS critical_anomalies_24h,
    MAX(ml.performed_at) AS last_maintenance
FROM equipment e
JOIN facilities f ON f.id = e.facility_id
LEFT JOIN sensors s ON s.equipment_id = e.id
LEFT JOIN anomaly_events ae ON ae.sensor_id = s.id
LEFT JOIN maintenance_logs ml ON ml.equipment_id = e.id
GROUP BY f.name, e.name, e.equipment_type, e.status
ORDER BY critical_anomalies_24h DESC, f.name, e.name;

-- Dashboard Panel 2: Vibration trends for Line 3 CNC machines (last 24h)
SELECT
    time_bucket('15 minutes', r.time) AS period,
    e.name AS machine,
    AVG(r.value) AS avg_vibration,
    MAX(r.value) AS peak_vibration
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
WHERE e.production_line = 'Line 3'
  AND e.equipment_type = 'cnc'
  AND s.sensor_type = 'vibration'
  AND r.time > NOW() - INTERVAL '24 hours'
GROUP BY period, e.name
ORDER BY period, e.name;

-- Dashboard Panel 3: Equipment needing attention
-- (sensors exceeding 80% of their max range)
SELECT
    e.name AS machine,
    s.name AS sensor,
    s.sensor_type,
    s.max_range,
    latest.last_value,
    ROUND((latest.last_value / s.max_range * 100)::numeric, 1) AS pct_of_max
FROM sensors s
JOIN equipment e ON e.id = s.equipment_id
CROSS JOIN LATERAL (
    SELECT value AS last_value
    FROM sensor_readings
    WHERE sensor_id = s.id
    ORDER BY time DESC
    LIMIT 1
) latest
WHERE s.is_active = TRUE
  AND s.max_range IS NOT NULL
  AND latest.last_value > s.max_range * 0.8
ORDER BY pct_of_max DESC;

Anomaly Detection Integration

When an ML anomaly detection model flags unusual behavior, it writes to the anomaly_events table with full metadata context. A Python worker might look like this:

async def record_anomaly(
    pool: asyncpg.Pool,
    sensor_id: int,
    anomaly_type: str,
    severity: str,
    value_at_detection: float,
    model_version: str,
):
    """Record an anomaly event with metadata validation."""
    async with pool.acquire() as conn:
        # Validate sensor exists and get context for logging
        sensor = await conn.fetchrow("""
            SELECT s.name, s.sensor_type, s.max_range,
                   e.name AS equipment, f.name AS facility
            FROM sensors s
            JOIN equipment e ON e.id = s.equipment_id
            JOIN facilities f ON f.id = e.facility_id
            WHERE s.id = $1
        """, sensor_id)

        if not sensor:
            raise ValueError(f"Sensor {sensor_id} not found in metadata")

        anomaly_id = await conn.fetchval("""
            INSERT INTO anomaly_events
                (sensor_id, start_time, anomaly_type, severity,
                 value_at_detection, model_version)
            VALUES ($1, NOW(), $2, $3, $4, $5)
            RETURNING id
        """, sensor_id, anomaly_type, severity, value_at_detection, model_version)

        logger.warning(
            f"Anomaly #{anomaly_id}: {severity} {anomaly_type} on "
            f"{sensor['equipment']}/{sensor['name']} ({sensor['facility']}) "
            f"value={value_at_detection} (max={sensor['max_range']})"
        )

        return anomaly_id

Common Pitfalls

After reviewing dozens of sensor data architectures, these are the mistakes I see most often:

Pitfall Impact Solution
Denormalizing metadata into every time-series row 10-20x storage bloat, metadata updates require backfilling billions of rows Store only sensor_id in time-series, JOIN at query time
No foreign key validation Orphaned readings accumulate, 10-20% of data becomes unlinkable Validate sensor_id at ingestion, run periodic quality checks
Single database for everything Either metadata or time-series queries suffer poor performance Use TimescaleDB (best of both) or a split architecture
Not planning for sensor changes Historical data misinterpreted after recalibration or replacement Implement SCD Type 2 for sensor history
Ignoring time zones Time shifts corrupt analysis, especially across multi-site deployments Always use TIMESTAMPTZ, store in UTC, convert at display time
Missing indexes on JOIN columns Cross-domain queries take minutes instead of milliseconds Index (sensor_id, time DESC) on time-series, all FKs on metadata
No retention policy Storage costs grow linearly forever, query performance degrades Tiered retention: raw (30d) → downsampled (1y) → archived (S3)
String-based sensor identification Name changes break links, inconsistent naming across teams Use integer IDs as primary key, names as human-readable labels

 

Tip: Run the data quality checks from our ingestion script on a daily schedule. Set alerts for orphaned sensor IDs (readings from sensors not in the metadata registry) and silent sensors (registered sensors with no recent readings). These are early indicators of infrastructure problems.

Conclusion

Managing metadata and time-series data together is not a luxury — it is a fundamental requirement for any system that wants to derive actionable insights from sensor data. The sensor_id is the bridge between what your sensors are (metadata) and what they are measuring (time-series), and your architecture must make it trivially easy to cross that bridge in both directions.

For most teams, PostgreSQL with TimescaleDB is the right starting point. You get native SQL JOINs across metadata and time-series tables, a single connection string, familiar tooling, and excellent performance up to terabyte scale. When you outgrow that, the patterns for InfluxDB integration, Parquet data lakes, and TDengine super tables give you a clear upgrade path.

The key design principles to remember:

  • Separate but connected: Metadata in relational tables, time-series in optimized storage, linked by sensor_id
  • Sensor registry: Treat sensors as first-class entities with rich metadata (type, unit, range, calibration, sampling rate)
  • Slowly changing dimensions: Track metadata changes over time so historical data can be correctly interpreted
  • Validate at ingestion: Never insert a time-series reading without confirming the sensor exists in metadata
  • Tiered retention: Raw data (30 days) → downsampled (1 year) → aggregated (forever) → archived (cold storage)
  • Index the bridge: Composite indexes on (sensor_id, time DESC) make cross-domain queries fast

The complete schema, ingestion pipeline, query patterns, and API design in this guide give you a production-ready blueprint. Start with the PostgreSQL + TimescaleDB pattern, add the sensor registry and validation layer, implement continuous aggregates for downsampling, and build your API layer with FastAPI. You will have a system where "show me all vibration anomalies from Building A's CNC machines installed after 2023" is a query that returns results in milliseconds, not a question that leaves your team staring at their screens.

References

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *