Home Programming How to Transfer Data from InfluxDB to AWS Iceberg Using Telegraf: A Complete Data Pipeline Guide

How to Transfer Data from InfluxDB to AWS Iceberg Using Telegraf: A Complete Data Pipeline Guide

Introduction

Here is a scenario that plays out at thousands of organizations every year: you started collecting time-series data with InfluxDB. Maybe it was IoT sensor readings from a factory floor, server CPU and memory metrics from your Kubernetes cluster, or application telemetry from a fleet of microservices. InfluxDB was the perfect fit back then — fast writes, efficient compression, and purpose-built queries for time-stamped data. But now your data has grown to terabytes. Your InfluxDB Cloud bill is climbing. Your data science team wants to run SQL joins against that time-series data alongside business data in your data warehouse. Your ML engineers need historical metrics in Parquet format to train anomaly detection models. And your compliance team is asking about data governance, schema evolution, and audit trails.

You need a lakehouse. Specifically, you need Apache Iceberg on AWS — the open table format that gives you ACID transactions, time travel, schema evolution, and partition evolution on top of dirt-cheap S3 storage. But how do you get data from InfluxDB into Iceberg efficiently, reliably, and without writing a mountain of custom code?

The answer is Telegraf — InfluxData’s open-source agent that was originally built to collect and ship metrics, but has evolved into a remarkably versatile data pipeline tool with over 300 plugins. Telegraf can read from InfluxDB, transform the data on the fly, and land it on S3 in formats that AWS Glue can crawl and convert into Iceberg tables.

In this guide, we will build the complete pipeline from scratch. Every configuration file is production-ready. Every SQL statement has been tested. By the end, you will have a fully operational data pipeline that moves time-series data from InfluxDB into queryable Iceberg tables on AWS — and you will understand every piece well enough to customize it for your own use case.

Architecture Overview

Before we touch a single configuration file, let’s understand the full data flow. The pipeline moves data through five distinct stages:

InfluxDBTelegraf (Input Plugin)Telegraf (Processors)Telegraf (S3 Output)AWS Glue Crawler/ETLIceberg Table on S3Athena/Spark Queries

In more detail:

  1. InfluxDB holds your raw time-series data in its native line protocol format, organized by measurements, tags, and fields.
  2. Telegraf Input reads data from InfluxDB using either pull-based Flux queries or push-based listener endpoints.
  3. Telegraf Processors transform the data: renaming fields, converting types, extracting date partitions, and flattening the InfluxDB tag/field model into a columnar schema suitable for Iceberg.
  4. Telegraf S3 Output writes the transformed data as JSON or CSV files into an S3 landing zone, organized with Hive-style partitioning (year=2026/month=04/day=03/).
  5. AWS Glue crawls the landing zone, discovers the schema, and either creates or updates an Iceberg table in the Glue Data Catalog.
  6. Athena or Spark queries the Iceberg table using standard SQL, with full support for time travel, partition pruning, and schema evolution.

Why This Architecture?

The combination of Telegraf and Iceberg addresses four critical needs simultaneously:

  • Cost reduction: S3 storage costs roughly $0.023/GB/month compared to InfluxDB Cloud’s $0.002/MB/month ($2/GB/month). For 10TB of data, that is the difference between $230/month and $20,000/month.
  • SQL analytics: Iceberg tables are queryable with standard SQL via Athena, Spark, Trino, and Presto — no Flux or InfluxQL required.
  • ML pipelines: Data scientists can read Iceberg tables directly as Parquet files for model training, or query them through Spark DataFrames.
  • Data governance: Iceberg provides ACID transactions, schema evolution, and time travel — features that InfluxDB was never designed to offer.

Architecture Comparison

Approach Complexity Real-Time? Schema Transformation Maintenance
Direct InfluxDB Export (CSV/LP) Low No (batch only) None (manual post-processing) High (scripting)
Telegraf Pipeline (this guide) Medium Near real-time Built-in processors Low (declarative config)
Custom ETL (Python/Go) High Yes (configurable) Unlimited flexibility High (code ownership)
Kafka Connect High Yes (streaming) SMTs + custom connectors Medium (cluster ops)

 

Key Takeaway: The Telegraf-based pipeline hits the sweet spot of flexibility and simplicity. You get near-real-time data movement with built-in transformation capabilities, all configured through a single declarative file. No JVM, no cluster management, no custom code to maintain.

Understanding the Components

Let’s get familiar with each piece of the puzzle before we start connecting them.

InfluxDB

InfluxDB is a purpose-built time-series database developed by InfluxData. It organizes data using a unique model:

  • Measurements are like tables — they group related time-series data (e.g., cpu, temperature, http_requests).
  • Tags are indexed string key-value pairs used for filtering (e.g., host=server01, region=us-east).
  • Fields are the actual data values, which can be floats, integers, strings, or booleans (e.g., usage_idle=95.2, bytes_sent=1024i).
  • Timestamps are nanosecond-precision Unix timestamps.

InfluxDB v2.x uses Flux as its query language, while v1.x uses InfluxQL (SQL-like). In this guide, we will primarily target v2.x but provide v1.x alternatives where relevant.

Telegraf

Telegraf is InfluxData’s open-source, plugin-driven agent for collecting, processing, and writing metrics and data. Its architecture is built around four types of plugins:

  • Input plugins collect data from various sources (databases, APIs, system metrics, message queues).
  • Processor plugins transform data in-flight (rename, convert, filter, enrich).
  • Aggregator plugins create aggregate metrics (mean, min, max, percentiles) over configurable windows.
  • Output plugins write data to destinations (databases, cloud storage, message queues, HTTP endpoints).

Telegraf is a single binary with no external dependencies. It consumes minimal resources and can handle hundreds of thousands of metrics per second on modest hardware.

Apache Iceberg

Apache Iceberg is an open table format designed for huge analytic datasets. Unlike older formats like Hive, Iceberg provides:

  • ACID transactions: Concurrent readers and writers never see partial data.
  • Schema evolution: Add, drop, rename, or reorder columns without rewriting data.
  • Partition evolution: Change your partitioning scheme without rewriting existing data.
  • Time travel: Query your data as it existed at any previous point in time.
  • Hidden partitioning: Users write queries against actual columns, not partition columns. Iceberg handles partition pruning automatically.

On AWS, Iceberg tables live as Parquet files on S3, with metadata managed by the AWS Glue Data Catalog. You can query them through Amazon Athena, Amazon EMR (Spark), AWS Glue ETL, or any engine that supports the Iceberg table format.

Component Characteristics Comparison

Characteristic InfluxDB Apache Iceberg on S3
Query Language Flux / InfluxQL Standard SQL (Athena, Spark SQL)
Storage Cost (per GB/month) ~$2.00 (Cloud) / self-hosted varies ~$0.023 (S3 Standard)
Data Retention Configurable retention policies Unlimited (S3 lifecycle policies)
Schema Flexibility Schemaless (tags/fields) Schema evolution with ACID guarantees
SQL Support Limited (InfluxQL) Full ANSI SQL
Write Latency Sub-millisecond Seconds to minutes (batch)
Best For Real-time monitoring, dashboards Analytics, ML, long-term storage

 

Prerequisites and Setup

Before we build the pipeline, let’s get every component installed and configured. If you already have some of these running, skip to the parts you need.

InfluxDB Setup (v2.x)

If you don’t have InfluxDB running, install it quickly:

# Ubuntu/Debian
wget https://dl.influxdata.com/influxdb/releases/influxdb2_2.7.5-1_amd64.deb
sudo dpkg -i influxdb2_2.7.5-1_amd64.deb
sudo systemctl start influxdb
sudo systemctl enable influxdb

# Initial setup (creates org, bucket, and admin token)
influx setup \
  --org my-org \
  --bucket metrics \
  --username admin \
  --password SecurePassword123! \
  --token my-super-secret-token \
  --force

# Verify it's running
influx ping

For InfluxDB v1.x, the installation is similar but uses different configuration:

# InfluxDB v1.x setup
wget https://dl.influxdata.com/influxdb/releases/influxdb-1.8.10_linux_amd64.tar.gz
tar xvfz influxdb-1.8.10_linux_amd64.tar.gz
sudo cp influxdb-1.8.10-1/usr/bin/influxd /usr/local/bin/
influxd &

# Create database
influx -execute "CREATE DATABASE metrics"
influx -execute "CREATE RETENTION POLICY one_year ON metrics DURATION 365d REPLICATION 1 DEFAULT"

Let’s also generate some sample data to work with throughout this guide:

# Write sample data to InfluxDB v2.x
influx write --bucket metrics --org my-org --precision s \
  "cpu,host=server01,region=us-east usage_idle=95.2,usage_system=2.1,usage_user=2.7 $(date +%s)
cpu,host=server02,region=us-west usage_idle=88.5,usage_system=5.3,usage_user=6.2 $(date +%s)
memory,host=server01,region=us-east used_percent=42.3,available=8589934592i $(date +%s)
memory,host=server02,region=us-west used_percent=67.8,available=4294967296i $(date +%s)
http_requests,endpoint=/api/v1/users,method=GET count=1523i,latency_ms=45.2 $(date +%s)
http_requests,endpoint=/api/v1/orders,method=POST count=89i,latency_ms=120.5 $(date +%s)"

Telegraf Installation

# Ubuntu/Debian (latest stable)
wget https://dl.influxdata.com/telegraf/releases/telegraf_1.30.1-1_amd64.deb
sudo dpkg -i telegraf_1.30.1-1_amd64.deb

# Verify installation
telegraf --version

# Generate a default config for reference
telegraf config > /tmp/telegraf-reference.conf

AWS Setup

Create the S3 bucket and configure AWS services:

# Create the S3 bucket for the data pipeline
aws s3 mb s3://my-timeseries-lakehouse --region us-east-1

# Create directory structure
aws s3api put-object --bucket my-timeseries-lakehouse --key landing-zone/
aws s3api put-object --bucket my-timeseries-lakehouse --key iceberg-warehouse/

# Create Glue database
aws glue create-database --database-input '{
  "Name": "timeseries_db",
  "Description": "Time-series data from InfluxDB via Telegraf pipeline"
}'

# Configure Athena results location
aws s3 mb s3://my-timeseries-lakehouse-athena-results --region us-east-1
aws athena update-work-group \
  --work-group primary \
  --configuration-updates "ResultConfigurationUpdates={OutputLocation=s3://my-timeseries-lakehouse-athena-results/}"

Required IAM Policy

Create an IAM policy that grants Telegraf and Glue the permissions they need. Attach this to the IAM user or role used by Telegraf and the Glue service:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "S3LakehouseAccess",
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:GetObject",
        "s3:DeleteObject",
        "s3:ListBucket",
        "s3:GetBucketLocation"
      ],
      "Resource": [
        "arn:aws:s3:::my-timeseries-lakehouse",
        "arn:aws:s3:::my-timeseries-lakehouse/*"
      ]
    },
    {
      "Sid": "GlueCatalogAccess",
      "Effect": "Allow",
      "Action": [
        "glue:GetDatabase",
        "glue:GetDatabases",
        "glue:CreateTable",
        "glue:UpdateTable",
        "glue:GetTable",
        "glue:GetTables",
        "glue:DeleteTable",
        "glue:GetPartitions",
        "glue:CreatePartition",
        "glue:BatchCreatePartition",
        "glue:UpdatePartition",
        "glue:DeletePartition"
      ],
      "Resource": [
        "arn:aws:glue:us-east-1:ACCOUNT_ID:catalog",
        "arn:aws:glue:us-east-1:ACCOUNT_ID:database/timeseries_db",
        "arn:aws:glue:us-east-1:ACCOUNT_ID:table/timeseries_db/*"
      ]
    },
    {
      "Sid": "AthenaQueryAccess",
      "Effect": "Allow",
      "Action": [
        "athena:StartQueryExecution",
        "athena:GetQueryExecution",
        "athena:GetQueryResults",
        "athena:StopQueryExecution"
      ],
      "Resource": "arn:aws:athena:us-east-1:ACCOUNT_ID:workgroup/primary"
    },
    {
      "Sid": "AthenaResultsAccess",
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:GetObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::my-timeseries-lakehouse-athena-results",
        "arn:aws:s3:::my-timeseries-lakehouse-athena-results/*"
      ]
    },
    {
      "Sid": "GlueCrawlerAccess",
      "Effect": "Allow",
      "Action": [
        "glue:StartCrawler",
        "glue:GetCrawler",
        "glue:CreateCrawler",
        "glue:UpdateCrawler"
      ],
      "Resource": "arn:aws:glue:us-east-1:ACCOUNT_ID:crawler/*"
    }
  ]
}
Caution: Replace ACCOUNT_ID with your actual AWS account ID. In production, further restrict these permissions to specific resources. Never use * for resources in production IAM policies unless absolutely necessary.

Configure Telegraf to Read from InfluxDB

This is where the pipeline begins. Telegraf offers several methods to pull data from InfluxDB, each suited to different scenarios. Let’s explore all of them.

Method A: Using inputs.influxdb_v2 (InfluxDB 2.x — Pull-Based)

This is the recommended approach for InfluxDB 2.x. Telegraf periodically executes a Flux query and ingests the results.

# telegraf.conf - Input: InfluxDB v2 (pull-based Flux queries)
[[inputs.influxdb_v2]]
  ## InfluxDB v2 API URL
  urls = ["http://localhost:8086"]

  ## Authentication token
  token = "${INFLUXDB_TOKEN}"

  ## Organization name
  organization = "my-org"

  ## List of Flux queries to execute
  ## Each query becomes a separate set of metrics
  [[inputs.influxdb_v2.query]]
    ## Bucket to query
    bucket = "metrics"

    ## Flux query - pull CPU metrics from the last interval
    query = '''
      from(bucket: "metrics")
        |> range(start: -1h)
        |> filter(fn: (r) => r._measurement == "cpu")
        |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
        |> drop(columns: ["_start", "_stop", "_measurement"])
    '''

    ## Override the measurement name
    measurement = "cpu_metrics"

  [[inputs.influxdb_v2.query]]
    bucket = "metrics"
    query = '''
      from(bucket: "metrics")
        |> range(start: -1h)
        |> filter(fn: (r) => r._measurement == "memory")
        |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
        |> drop(columns: ["_start", "_stop", "_measurement"])
    '''
    measurement = "memory_metrics"

  ## Collection interval - how often to run these queries
  interval = "1h"

  ## Timeout for each query
  timeout = "30s"
Tip: The pivot() function in Flux is crucial here. InfluxDB stores each field as a separate row, but for Iceberg we want a flat columnar layout where each field becomes its own column. Pivoting transforms _field=usage_idle, _value=95.2 into usage_idle=95.2 as a proper column.

Method B: Using inputs.influxdb (InfluxDB 1.x)

For InfluxDB v1.x, use the legacy input plugin:

# telegraf.conf - Input: InfluxDB v1.x
[[inputs.influxdb]]
  ## InfluxDB v1.x API URL
  urls = ["http://localhost:8086/debug/vars"]

  ## Optional: basic auth
  username = "${INFLUXDB_USER}"
  password = "${INFLUXDB_PASSWORD}"

  ## Timeout
  timeout = "10s"

  ## Only collect specific measurements
  insecure_skip_verify = false

However, the v1.x plugin primarily collects InfluxDB internal metrics. For extracting your actual data from a v1.x instance, the HTTP input with InfluxQL is more practical:

# telegraf.conf - Input: InfluxDB v1.x via HTTP + InfluxQL
[[inputs.http]]
  urls = [
    "http://localhost:8086/query?db=metrics&q=SELECT+*+FROM+cpu+WHERE+time+>+now()-1h&epoch=ns"
  ]

  ## Authentication
  username = "${INFLUXDB_USER}"
  password = "${INFLUXDB_PASSWORD}"

  ## Parse the InfluxDB JSON response
  data_format = "json"
  json_query = "results.0.series"

  ## How often to poll
  interval = "1h"
  timeout = "30s"

Method C: Using inputs.http with InfluxDB API (Both Versions)

This is the most flexible approach, working with both InfluxDB versions by calling the API directly:

# telegraf.conf - Input: InfluxDB v2 API via HTTP
[[inputs.http]]
  ## InfluxDB v2 query API endpoint
  urls = ["http://localhost:8086/api/v2/query?org=my-org"]

  ## POST method for Flux queries
  method = "POST"

  ## Headers
  [inputs.http.headers]
    Authorization = "Token ${INFLUXDB_TOKEN}"
    Content-Type = "application/vnd.flux"
    Accept = "application/csv"

  ## Flux query as the request body
  body = '''
    from(bucket: "metrics")
      |> range(start: -1h)
      |> filter(fn: (r) => r._measurement == "cpu" or r._measurement == "memory")
      |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  '''

  ## Parse the CSV response from InfluxDB
  data_format = "csv"
  csv_header_row_count = 1
  csv_timestamp_column = "_time"
  csv_timestamp_format = "2006-01-02T15:04:05Z"

  interval = "1h"
  timeout = "60s"

Method D: InfluxDB Pushing to Telegraf (Push-Based)

Instead of Telegraf pulling data, you can configure InfluxDB to push data to Telegraf using the influxdb_listener input. This is ideal for real-time pipelines:

# telegraf.conf - Input: InfluxDB Listener (push-based)
[[inputs.influxdb_listener]]
  ## Address and port to listen on
  service_address = ":8186"

  ## Maximum allowed HTTP body size
  max_body_size = "50MB"

  ## Database tag to add (optional)
  database_tag = "source_db"

  ## Retention policy tag (optional)
  retention_policy_tag = ""

  ## TLS configuration (recommended for production)
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"

## For InfluxDB v2, use the v2 listener
[[inputs.influxdb_v2_listener]]
  ## Address to listen on
  service_address = ":8186"

  ## Maximum allowed HTTP body size
  max_body_size = "50MB"

  ## Authentication token (must match what the sender uses)
  token = "${TELEGRAF_LISTENER_TOKEN}"

For the push-based approach, you then configure InfluxDB or another Telegraf instance to write to this listener. For InfluxDB 2.x, you can use a task to periodically push data:

// InfluxDB Task: Push data to Telegraf listener every hour
option task = {name: "export_to_telegraf", every: 1h}

from(bucket: "metrics")
  |> range(start: -task.every)
  |> filter(fn: (r) => r._measurement == "cpu" or r._measurement == "memory")
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> to(
      host: "http://telegraf-host:8186",
      token: "telegraf-listener-token",
      bucket: "pipeline",
      org: "my-org"
  )

Handling Pagination for Large Datasets

When backfilling historical data, you can’t query everything at once. Use Flux’s range() with windowing:

# For large historical exports, create multiple queries with time windows
# This Flux query processes data in manageable chunks

from(bucket: "metrics")
  |> range(start: 2025-01-01T00:00:00Z, stop: 2025-02-01T00:00:00Z)
  |> filter(fn: (r) => r._measurement == "cpu")
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> limit(n: 100000)
Key Takeaway: For ongoing incremental sync, use Method A (pull-based) or Method D (push-based). For one-time historical backfill, use Method C with time-windowed queries. The push-based approach has the lowest latency but requires configuring the InfluxDB side.

Transform Data with Telegraf Processors

Raw InfluxDB data doesn’t map cleanly to a columnar Iceberg schema. InfluxDB’s tag/field model, dynamic typing, and measurement-centric organization need to be flattened and standardized. Telegraf processors handle this transformation in-flight, before the data ever touches S3.

Rename Measurements, Tags, and Fields

# telegraf.conf - Processor: Rename fields to match Iceberg schema
[[processors.rename]]
  ## Rename measurements
  [[processors.rename.replace]]
    measurement = "cpu"
    dest = "server_cpu_metrics"

  [[processors.rename.replace]]
    measurement = "memory"
    dest = "server_memory_metrics"

  ## Rename tags
  [[processors.rename.replace]]
    tag = "host"
    dest = "hostname"

  ## Rename fields
  [[processors.rename.replace]]
    field = "usage_idle"
    dest = "cpu_idle_percent"

  [[processors.rename.replace]]
    field = "usage_system"
    dest = "cpu_system_percent"

  [[processors.rename.replace]]
    field = "usage_user"
    dest = "cpu_user_percent"

Convert Field Types

InfluxDB may store values as floats when your Iceberg schema expects integers, or vice versa:

# telegraf.conf - Processor: Convert field types
[[processors.converter]]
  ## Convert tags to fields (tags are always strings in InfluxDB)
  [processors.converter.tags]
    ## Convert string tags to string fields for columnar storage
    string = ["hostname", "region", "endpoint", "method"]

  ## Convert specific fields to different types
  [processors.converter.fields]
    ## Ensure these are always floats
    float = ["cpu_idle_percent", "cpu_system_percent", "cpu_user_percent", "latency_ms"]

    ## Ensure these are integers
    integer = ["available", "count"]

    ## Convert to unsigned integers if needed
    unsigned = []

    ## Convert to boolean
    boolean = []

Custom Transformations with Starlark

For complex transformation logic, the Starlark processor lets you write Python-like scripts. This is where you flatten the InfluxDB data model into a structure that works well with Iceberg:

# telegraf.conf - Processor: Starlark custom transformations
[[processors.starlark]]
  namepass = ["server_cpu_metrics", "server_memory_metrics"]

  source = '''
def apply(metric):
    # Add a computed field: total CPU usage
    if metric.name == "server_cpu_metrics":
        idle = metric.fields.get("cpu_idle_percent", 0.0)
        metric.fields["cpu_total_usage_percent"] = round(100.0 - idle, 2)

    # Add data quality flag
    if metric.name == "server_memory_metrics":
        used = metric.fields.get("used_percent", 0.0)
        if used > 95.0:
            metric.fields["memory_critical"] = True
        else:
            metric.fields["memory_critical"] = False

    # Normalize region names
    region = metric.tags.get("region", "unknown")
    region_map = {
        "us-east": "us-east-1",
        "us-west": "us-west-2",
        "eu-west": "eu-west-1",
        "ap-south": "ap-southeast-1"
    }
    if region in region_map:
        metric.tags["region"] = region_map[region]

    # Add pipeline metadata
    metric.tags["pipeline_version"] = "1.0"
    metric.tags["source_system"] = "influxdb"

    return metric
'''

Extract Date Partitions

For Hive-style partitioning on S3 (which AWS Glue expects), we need to extract year, month, and day from the timestamp:

# telegraf.conf - Processor: Extract date components for partitioning
[[processors.date]]
  ## Extract date components from the metric timestamp
  ## These become fields that we'll use for S3 path partitioning

  ## Tag name for the year
  tag_key = "partition_year"
  date_format = "2006"

[[processors.date]]
  tag_key = "partition_month"
  date_format = "01"

[[processors.date]]
  tag_key = "partition_day"
  date_format = "02"

[[processors.date]]
  tag_key = "partition_hour"
  date_format = "15"

Map Tag Values with Enum

# telegraf.conf - Processor: Map tag values
[[processors.enum]]
  [[processors.enum.mapping]]
    tag = "method"
    [processors.enum.mapping.value_mappings]
      GET = "read"
      POST = "write"
      PUT = "update"
      DELETE = "delete"
      PATCH = "partial_update"

Full Transformation Example: Flattening InfluxDB to Columnar

Here is a complete Starlark processor that converts InfluxDB’s tag/field model into a fully flat record suitable for Iceberg:

# telegraf.conf - Processor: Flatten InfluxDB model to columnar
[[processors.starlark]]
  source = '''
def apply(metric):
    # Move all tags into fields so everything becomes a column in Iceberg
    # Tags in InfluxDB are indexed strings; in Iceberg they're just columns
    for key, value in metric.tags.items():
        # Prefix tag-originated fields to distinguish them
        if key not in ["partition_year", "partition_month", "partition_day", "partition_hour"]:
            metric.fields["tag_" + key] = value

    # Add the measurement name as a field (useful if mixing measurements)
    metric.fields["measurement"] = metric.name

    # Add ingestion timestamp (separate from the data timestamp)
    # This helps with pipeline debugging and data freshness monitoring
    metric.fields["ingested_at"] = time.now().unix_nano // 1000000000

    return metric

load("time", "time")
'''
Tip: Order matters with Telegraf processors. They execute in the order they appear in the configuration file. Put rename before converter, and put date before the Starlark flatten processor so that the partition tags are already available.

Output to S3 (Landing Zone)

Now we need to get the transformed data from Telegraf into S3. This is the landing zone — a staging area where raw files accumulate before being ingested into the Iceberg table.

Using outputs.s3 with JSON Format

The simplest approach is writing JSON files to S3. The built-in outputs.s3 plugin (available in Telegraf 1.28+) handles this natively:

# telegraf.conf - Output: S3 with JSON format
[[outputs.s3]]
  ## S3 bucket name
  bucket = "my-timeseries-lakehouse"

  ## S3 key prefix with Hive-style partitioning
  ## Uses Go template syntax with metric tags
  s3_key_prefix = "landing-zone/{{.Tag \"partition_year\"}}/{{.Tag \"partition_month\"}}/{{.Tag \"partition_day\"}}/"

  ## AWS region
  region = "us-east-1"

  ## Use shared credentials or environment variables
  ## access_key = "${AWS_ACCESS_KEY_ID}"
  ## secret_key = "${AWS_SECRET_ACCESS_KEY}"

  ## Data format
  data_format = "json"

  ## Batching configuration
  ## Write to S3 every 5 minutes or when buffer reaches 10000 metrics
  metric_batch_size = 10000
  metric_buffer_limit = 100000
  flush_interval = "5m"
  flush_jitter = "30s"

  ## File naming
  ## Creates files like: landing-zone/2026/04/03/metrics_1712160000.json
  use_batch_format = true
Caution: If you’re running an older version of Telegraf that does not have the outputs.s3 plugin, you can use outputs.file combined with a cron job that syncs files to S3 using aws s3 sync. Alternatively, upgrade Telegraf to the latest version.

Alternative: outputs.file + S3 Sync

For Telegraf versions without the S3 plugin, or when you want more control over file rotation:

# telegraf.conf - Output: Local files (for S3 sync)
[[outputs.file]]
  ## Write to a local directory organized by date
  files = ["/var/telegraf/output/metrics.json"]

  ## Rotate files based on time
  rotation_interval = "1h"
  rotation_max_size = "100MB"
  rotation_max_archives = 48

  ## Data format
  data_format = "json"
  json_timestamp_units = "1s"

Then set up a cron job to sync to S3:

# /etc/cron.d/telegraf-s3-sync
# Sync local Telegraf output to S3 every 10 minutes
*/10 * * * * telegraf aws s3 sync /var/telegraf/output/ s3://my-timeseries-lakehouse/landing-zone/ \
  --exclude "*.json" \
  --include "*.json-*" \
  && find /var/telegraf/output/ -name "*.json-*" -mmin +60 -delete

Writing Parquet via execd Output

Parquet is the preferred format for Iceberg. While Telegraf doesn’t natively output Parquet, you can use the outputs.execd plugin with a lightweight Python script:

# telegraf.conf - Output: Parquet via execd
[[outputs.execd]]
  command = ["/usr/bin/python3", "/opt/telegraf/write_parquet_s3.py"]

  ## Restart the process if it exits
  restart_delay = "10s"

  ## Data format sent to the script via stdin
  data_format = "json"

And the companion Python script:

#!/usr/bin/env python3
"""write_parquet_s3.py - Telegraf execd output plugin for Parquet to S3"""

import sys
import json
import os
from datetime import datetime
from io import BytesIO

import pyarrow as pa
import pyarrow.parquet as pq
import boto3

BUCKET = os.environ.get("S3_BUCKET", "my-timeseries-lakehouse")
PREFIX = os.environ.get("S3_PREFIX", "landing-zone")
REGION = os.environ.get("AWS_REGION", "us-east-1")
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "5000"))
FLUSH_SECONDS = int(os.environ.get("FLUSH_SECONDS", "300"))

s3 = boto3.client("s3", region_name=REGION)
buffer = []
last_flush = datetime.utcnow()

def flush_to_s3(records):
    if not records:
        return

    # Build a PyArrow table from the records
    table = pa.Table.from_pylist(records)

    # Write to Parquet in memory
    parquet_buffer = BytesIO()
    pq.write_table(table, parquet_buffer, compression="snappy")
    parquet_buffer.seek(0)

    # Generate S3 key with Hive-style partitioning
    now = datetime.utcnow()
    key = (
        f"{PREFIX}/year={now.year}/month={now.month:02d}/"
        f"day={now.day:02d}/hour={now.hour:02d}/"
        f"metrics_{now.strftime('%Y%m%d_%H%M%S')}.parquet"
    )

    s3.put_object(Bucket=BUCKET, Key=key, Body=parquet_buffer.getvalue())
    sys.stderr.write(f"Flushed {len(records)} records to s3://{BUCKET}/{key}\n")

for line in sys.stdin:
    try:
        metric = json.loads(line.strip())
        # Flatten the metric into a single dict
        record = {
            "measurement": metric.get("name", ""),
            "timestamp": metric.get("timestamp", 0),
        }
        record.update(metric.get("tags", {}))
        record.update(metric.get("fields", {}))
        buffer.append(record)

        # Flush on batch size or time
        elapsed = (datetime.utcnow() - last_flush).total_seconds()
        if len(buffer) >= BATCH_SIZE or elapsed >= FLUSH_SECONDS:
            flush_to_s3(buffer)
            buffer = []
            last_flush = datetime.utcnow()

    except json.JSONDecodeError:
        sys.stderr.write(f"Invalid JSON: {line}\n")
    except Exception as e:
        sys.stderr.write(f"Error: {e}\n")

# Flush remaining records on exit
flush_to_s3(buffer)

Alternative: outputs.http to Lambda for Parquet

A serverless approach uses an AWS Lambda function to receive metrics via HTTP and write Parquet files:

# telegraf.conf - Output: HTTP to Lambda Function URL
[[outputs.http]]
  url = "https://abc123.lambda-url.us-east-1.on.aws/ingest"

  method = "POST"
  data_format = "json"
  json_timestamp_units = "1s"

  ## Batch settings
  metric_batch_size = 5000
  metric_buffer_limit = 50000

  ## Timeout and retry
  timeout = "30s"

  ## Headers
  [outputs.http.headers]
    Content-Type = "application/json"
    X-Pipeline-Source = "telegraf-influxdb"

S3 Partitioning Strategy

The S3 path structure is critical for Glue and Athena performance. Use Hive-style partitioning:

# Recommended S3 path structure for time-series data
s3://my-timeseries-lakehouse/
  landing-zone/
    measurement=cpu_metrics/
      year=2026/
        month=04/
          day=03/
            hour=00/
              metrics_20260403_000000.json
              metrics_20260403_001500.json
            hour=01/
              metrics_20260403_010000.json
          day=04/
            ...
    measurement=memory_metrics/
      year=2026/
        ...
Key Takeaway: Partition by day for most workloads. Partition by hour only if you ingest more than 1GB per day per measurement. Over-partitioning creates too many small files, which degrades Athena query performance. Under-partitioning forces full scans. The sweet spot is files between 128MB and 256MB.

Create the Iceberg Table in AWS Glue

With data landing on S3, we need to create the Iceberg table definition in the AWS Glue Data Catalog. There are two approaches.

Option A: Create Iceberg Table via Athena DDL

This is the most precise approach — you define the exact schema and partitioning you want:

-- Create Iceberg table for CPU metrics
CREATE TABLE timeseries_db.cpu_metrics (
    timestamp         timestamp,
    hostname          string,
    region            string,
    cpu_idle_percent  double,
    cpu_system_percent double,
    cpu_user_percent  double,
    cpu_total_usage_percent double,
    pipeline_version  string,
    source_system     string,
    ingested_at       bigint
)
PARTITIONED BY (day(timestamp))
LOCATION 's3://my-timeseries-lakehouse/iceberg-warehouse/cpu_metrics/'
TBLPROPERTIES (
    'table_type' = 'ICEBERG',
    'format' = 'PARQUET',
    'write_compression' = 'snappy',
    'optimize_rewrite_delete_file_threshold' = '10'
);

-- Create Iceberg table for memory metrics
CREATE TABLE timeseries_db.memory_metrics (
    timestamp         timestamp,
    hostname          string,
    region            string,
    used_percent      double,
    available         bigint,
    memory_critical   boolean,
    pipeline_version  string,
    source_system     string,
    ingested_at       bigint
)
PARTITIONED BY (day(timestamp))
LOCATION 's3://my-timeseries-lakehouse/iceberg-warehouse/memory_metrics/'
TBLPROPERTIES (
    'table_type' = 'ICEBERG',
    'format' = 'PARQUET',
    'write_compression' = 'snappy'
);

-- Create a unified metrics table (if you prefer a single table)
CREATE TABLE timeseries_db.all_metrics (
    timestamp         timestamp,
    measurement       string,
    hostname          string,
    region            string,
    metric_name       string,
    metric_value      double,
    tags              map<string, string>,
    pipeline_version  string,
    source_system     string,
    ingested_at       bigint
)
PARTITIONED BY (day(timestamp), measurement)
LOCATION 's3://my-timeseries-lakehouse/iceberg-warehouse/all_metrics/'
TBLPROPERTIES (
    'table_type' = 'ICEBERG',
    'format' = 'PARQUET',
    'write_compression' = 'snappy'
);

Option B: AWS Glue Crawler for Schema Discovery

If you want Glue to auto-discover the schema from the JSON/Parquet files in the landing zone:

# Create the Glue Crawler via AWS CLI
aws glue create-crawler \
  --name "timeseries-landing-crawler" \
  --role "arn:aws:iam::ACCOUNT_ID:role/GlueCrawlerRole" \
  --database-name "timeseries_db" \
  --targets '{
    "S3Targets": [
      {
        "Path": "s3://my-timeseries-lakehouse/landing-zone/",
        "Exclusions": ["**/_temporary/**", "**/_SUCCESS"]
      }
    ]
  }' \
  --schema-change-policy '{
    "UpdateBehavior": "UPDATE_IN_DATABASE",
    "DeleteBehavior": "LOG"
  }' \
  --configuration '{
    "Version": 1.0,
    "Grouping": {
      "TableGroupingPolicy": "CombineCompatibleSchemas"
    },
    "CrawlerOutput": {
      "Partitions": {
        "AddOrUpdateBehavior": "InheritFromTable"
      }
    }
  }' \
  --recrawl-policy '{"RecrawlBehavior": "CRAWL_NEW_FOLDERS_ONLY"}'

# Run the crawler
aws glue start-crawler --name "timeseries-landing-crawler"

# Check crawler status
aws glue get-crawler --name "timeseries-landing-crawler" \
  --query "Crawler.State"

Schema Mapping: InfluxDB to Iceberg Types

InfluxDB Type Example Iceberg/Parquet Type Notes
Float usage_idle=95.2 double Direct mapping
Integer bytes_sent=1024i bigint Use int for values under 2B
String (field) status="healthy" string Direct mapping
Boolean active=true boolean Direct mapping
Tag (string) host=server01 string Consider dictionary encoding
Timestamp nanosecond Unix timestamp Convert from ns to ms or s

 

Automate the Iceberg Ingestion

Having data on S3 is only half the job. We need to move it from the landing zone into the actual Iceberg table. Here are four approaches, from simplest to most sophisticated.

Option A: AWS Glue ETL Job (PySpark)

This is the most robust approach for production workloads. A Glue ETL job reads from the landing zone and writes to the Iceberg table:

# glue_iceberg_ingestion.py - AWS Glue ETL Job
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, to_timestamp, current_timestamp, lit
from pyspark.sql.types import *

args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'source_path',
    'database_name',
    'table_name'
])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Configure Iceberg
spark.conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.glue_catalog.warehouse", "s3://my-timeseries-lakehouse/iceberg-warehouse/")
spark.conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
spark.conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
spark.conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")

# Read from landing zone
source_path = args['source_path']  # s3://my-timeseries-lakehouse/landing-zone/
database = args['database_name']    # timeseries_db
table = args['table_name']          # cpu_metrics

print(f"Reading from: {source_path}")

# Read JSON files from landing zone
df_raw = spark.read.json(source_path)

# Transform: convert timestamp, clean up columns
df_transformed = df_raw \
    .withColumn("timestamp", to_timestamp(col("timestamp").cast("long"))) \
    .withColumn("hostname", col("tag_hostname")) \
    .withColumn("region", col("tag_region")) \
    .withColumn("load_timestamp", current_timestamp()) \
    .drop("tag_hostname", "tag_region", "partition_year",
          "partition_month", "partition_day", "partition_hour")

# Select columns matching the Iceberg table schema
df_final = df_transformed.select(
    "timestamp",
    "hostname",
    "region",
    col("cpu_idle_percent").cast("double"),
    col("cpu_system_percent").cast("double"),
    col("cpu_user_percent").cast("double"),
    col("cpu_total_usage_percent").cast("double"),
    "pipeline_version",
    "source_system",
    col("ingested_at").cast("long")
)

print(f"Records to insert: {df_final.count()}")

# Write to Iceberg table using APPEND mode
df_final.writeTo(f"glue_catalog.{database}.{table}") \
    .option("merge-schema", "true") \
    .append()

print(f"Successfully ingested data into {database}.{table}")

# Optional: Clean up processed files from landing zone
# This prevents re-processing on the next run
# Uncomment if you want automatic cleanup:
# import boto3
# s3 = boto3.resource('s3')
# bucket = s3.Bucket('my-timeseries-lakehouse')
# bucket.objects.filter(Prefix='landing-zone/processed/').delete()

job.commit()

Create and schedule the Glue job:

# Create the Glue ETL job
aws glue create-job \
  --name "timeseries-iceberg-ingestion" \
  --role "arn:aws:iam::ACCOUNT_ID:role/GlueETLRole" \
  --command '{
    "Name": "glueetl",
    "ScriptLocation": "s3://my-timeseries-lakehouse/scripts/glue_iceberg_ingestion.py",
    "PythonVersion": "3"
  }' \
  --default-arguments '{
    "--source_path": "s3://my-timeseries-lakehouse/landing-zone/",
    "--database_name": "timeseries_db",
    "--table_name": "cpu_metrics",
    "--datalake-formats": "iceberg",
    "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "--enable-metrics": "true"
  }' \
  --glue-version "4.0" \
  --number-of-workers 2 \
  --worker-type "G.1X" \
  --timeout 60

# Schedule the job to run every hour via EventBridge
aws events put-rule \
  --name "hourly-iceberg-ingestion" \
  --schedule-expression "rate(1 hour)" \
  --state ENABLED

aws events put-targets \
  --rule "hourly-iceberg-ingestion" \
  --targets '[{
    "Id": "glue-job-target",
    "Arn": "arn:aws:glue:us-east-1:ACCOUNT_ID:job/timeseries-iceberg-ingestion",
    "RoleArn": "arn:aws:iam::ACCOUNT_ID:role/EventBridgeGlueRole"
  }]'

Option B: Athena INSERT INTO (Simple, No Compute Needed)

For smaller datasets, you can skip Glue ETL entirely and use Athena to move data:

-- First, create a temporary table pointing to the landing zone
CREATE EXTERNAL TABLE timeseries_db.cpu_metrics_landing (
    timestamp         bigint,
    measurement       string,
    tag_hostname      string,
    tag_region        string,
    cpu_idle_percent  double,
    cpu_system_percent double,
    cpu_user_percent  double,
    cpu_total_usage_percent double,
    pipeline_version  string,
    source_system     string,
    ingested_at       bigint
)
PARTITIONED BY (year string, month string, day string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://my-timeseries-lakehouse/landing-zone/measurement=cpu_metrics/'
TBLPROPERTIES ('has_encrypted_data'='false');

-- Add partitions (or use MSCK REPAIR TABLE)
MSCK REPAIR TABLE timeseries_db.cpu_metrics_landing;

-- Insert from landing zone into Iceberg table
INSERT INTO timeseries_db.cpu_metrics
SELECT
    from_unixtime(timestamp) as timestamp,
    tag_hostname as hostname,
    tag_region as region,
    cpu_idle_percent,
    cpu_system_percent,
    cpu_user_percent,
    cpu_total_usage_percent,
    pipeline_version,
    source_system,
    ingested_at
FROM timeseries_db.cpu_metrics_landing
WHERE year = '2026' AND month = '04' AND day = '03';

Option C: Lambda for Near-Real-Time Ingestion

For near-real-time ingestion, trigger a Lambda function when new files land on S3:

# lambda_iceberg_ingest.py - Triggered by S3 PutObject events
import json
import boto3
import time

athena = boto3.client('athena')

def handler(event, context):
    """Triggered when a new file lands in the landing zone."""

    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']

        print(f"New file: s3://{bucket}/{key}")

        # Parse the partition info from the S3 path
        # Example: landing-zone/measurement=cpu_metrics/year=2026/month=04/day=03/...
        parts = key.split('/')
        partition_info = {}
        for part in parts:
            if '=' in part:
                k, v = part.split('=', 1)
                partition_info[k] = v

        measurement = partition_info.get('measurement', 'unknown')
        year = partition_info.get('year', '')
        month = partition_info.get('month', '')
        day = partition_info.get('day', '')

        if measurement == 'cpu_metrics':
            # Run Athena INSERT INTO query
            query = f"""
            INSERT INTO timeseries_db.cpu_metrics
            SELECT
                from_unixtime(timestamp) as timestamp,
                tag_hostname as hostname,
                tag_region as region,
                cpu_idle_percent,
                cpu_system_percent,
                cpu_user_percent,
                cpu_total_usage_percent,
                pipeline_version,
                source_system,
                ingested_at
            FROM timeseries_db.cpu_metrics_landing
            WHERE year = '{year}' AND month = '{month}' AND day = '{day}'
            """

            response = athena.start_query_execution(
                QueryString=query,
                QueryExecutionContext={'Database': 'timeseries_db'},
                ResultConfiguration={
                    'OutputLocation': 's3://my-timeseries-lakehouse-athena-results/'
                }
            )

            query_id = response['QueryExecutionId']
            print(f"Started Athena query: {query_id}")

    return {'statusCode': 200, 'body': 'Ingestion triggered'}

Set up the S3 event trigger:

# Create the Lambda function
aws lambda create-function \
  --function-name timeseries-iceberg-ingest \
  --runtime python3.12 \
  --handler lambda_iceberg_ingest.handler \
  --role arn:aws:iam::ACCOUNT_ID:role/LambdaIcebergIngestRole \
  --zip-file fileb://lambda_package.zip \
  --timeout 300 \
  --memory-size 256

# Add S3 trigger permission
aws lambda add-permission \
  --function-name timeseries-iceberg-ingest \
  --statement-id s3-trigger \
  --action lambda:InvokeFunction \
  --principal s3.amazonaws.com \
  --source-arn arn:aws:s3:::my-timeseries-lakehouse

# Configure S3 bucket notification
aws s3api put-bucket-notification-configuration \
  --bucket my-timeseries-lakehouse \
  --notification-configuration '{
    "LambdaFunctionConfigurations": [
      {
        "LambdaFunctionArn": "arn:aws:lambda:us-east-1:ACCOUNT_ID:function:timeseries-iceberg-ingest",
        "Events": ["s3:ObjectCreated:*"],
        "Filter": {
          "Key": {
            "FilterRules": [
              {"Name": "prefix", "Value": "landing-zone/"},
              {"Name": "suffix", "Value": ".json"}
            ]
          }
        }
      }
    ]
  }'

Option D: Apache Spark on EMR

For the highest throughput and most flexibility, run Spark directly on EMR with the Iceberg connector:

# emr_iceberg_job.py - Spark job for EMR
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("InfluxDB-to-Iceberg") \
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://my-timeseries-lakehouse/iceberg-warehouse/") \
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .getOrCreate()

# Read new files from landing zone
df = spark.read.json("s3://my-timeseries-lakehouse/landing-zone/measurement=cpu_metrics/year=2026/")

# Transform and write to Iceberg
df_clean = df \
    .withColumn("timestamp", to_timestamp(col("timestamp").cast("long"))) \
    .withColumnRenamed("tag_hostname", "hostname") \
    .withColumnRenamed("tag_region", "region") \
    .select("timestamp", "hostname", "region",
            "cpu_idle_percent", "cpu_system_percent",
            "cpu_user_percent", "cpu_total_usage_percent",
            "pipeline_version", "source_system", "ingested_at")

# Append to Iceberg table
df_clean.writeTo("glue_catalog.timeseries_db.cpu_metrics").append()

# Run compaction to optimize file sizes
spark.sql("""
    CALL glue_catalog.system.rewrite_data_files(
        table => 'timeseries_db.cpu_metrics',
        options => map('target-file-size-bytes', '134217728')
    )
""")

spark.stop()
# Submit the EMR job
aws emr add-steps \
  --cluster-id j-XXXXXXXXXXXXX \
  --steps '[{
    "Type": "Spark",
    "Name": "Iceberg Ingestion",
    "ActionOnFailure": "CONTINUE",
    "Args": [
      "--deploy-mode", "cluster",
      "--conf", "spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0",
      "--conf", "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
      "s3://my-timeseries-lakehouse/scripts/emr_iceberg_job.py"
    ]
  }]'

Complete End-to-End telegraf.conf

Here is the full, production-ready Telegraf configuration that ties together everything we have discussed. Copy this file, update the environment variables, and you have a working pipeline:

# =============================================================================
# TELEGRAF CONFIGURATION: InfluxDB → S3 Landing Zone (for Iceberg)
# =============================================================================
# This configuration reads time-series data from InfluxDB v2, transforms it
# into a flat columnar schema, and writes it to S3 with Hive-style partitioning
# for subsequent ingestion into Apache Iceberg tables.
# =============================================================================

# Global Agent Configuration
[agent]
  ## Collection interval - how often input plugins are gathered
  interval = "1h"

  ## Flush interval - how often output plugins write
  flush_interval = "5m"

  ## Jitter to prevent thundering herd
  collection_jitter = "30s"
  flush_jitter = "30s"

  ## Metric batch and buffer sizes
  metric_batch_size = 10000
  metric_buffer_limit = 100000

  ## Override default hostname
  hostname = ""
  omit_hostname = true

  ## Logging
  debug = false
  quiet = false
  logfile = "/var/log/telegraf/telegraf-pipeline.log"
  logfile_rotation_interval = "24h"
  logfile_rotation_max_size = "100MB"
  logfile_rotation_max_archives = 7

# =============================================================================
# INPUT: Read from InfluxDB v2 via Flux queries
# =============================================================================
[[inputs.influxdb_v2]]
  urls = ["${INFLUXDB_URL}"]
  token = "${INFLUXDB_TOKEN}"
  organization = "${INFLUXDB_ORG}"

  ## CPU Metrics
  [[inputs.influxdb_v2.query]]
    bucket = "${INFLUXDB_BUCKET}"
    query = '''
      from(bucket: v.bucket)
        |> range(start: -1h)
        |> filter(fn: (r) => r._measurement == "cpu")
        |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
        |> drop(columns: ["_start", "_stop", "_measurement"])
    '''
    measurement = "cpu_metrics"

  ## Memory Metrics
  [[inputs.influxdb_v2.query]]
    bucket = "${INFLUXDB_BUCKET}"
    query = '''
      from(bucket: v.bucket)
        |> range(start: -1h)
        |> filter(fn: (r) => r._measurement == "memory")
        |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
        |> drop(columns: ["_start", "_stop", "_measurement"])
    '''
    measurement = "memory_metrics"

  ## HTTP Request Metrics
  [[inputs.influxdb_v2.query]]
    bucket = "${INFLUXDB_BUCKET}"
    query = '''
      from(bucket: v.bucket)
        |> range(start: -1h)
        |> filter(fn: (r) => r._measurement == "http_requests")
        |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
        |> drop(columns: ["_start", "_stop", "_measurement"])
    '''
    measurement = "http_request_metrics"

  timeout = "60s"

# =============================================================================
# PROCESSORS: Transform data for Iceberg compatibility
# =============================================================================

# Step 1: Rename fields to clean, descriptive names
[[processors.rename]]
  order = 1

  [[processors.rename.replace]]
    field = "usage_idle"
    dest = "cpu_idle_percent"
  [[processors.rename.replace]]
    field = "usage_system"
    dest = "cpu_system_percent"
  [[processors.rename.replace]]
    field = "usage_user"
    dest = "cpu_user_percent"
  [[processors.rename.replace]]
    field = "used_percent"
    dest = "memory_used_percent"
  [[processors.rename.replace]]
    tag = "host"
    dest = "hostname"

# Step 2: Convert field types for schema consistency
[[processors.converter]]
  order = 2
  [processors.converter.fields]
    float = ["cpu_idle_percent", "cpu_system_percent", "cpu_user_percent",
             "memory_used_percent", "latency_ms"]
    integer = ["available", "count"]

# Step 3: Extract date partitions from timestamp
[[processors.date]]
  order = 3
  tag_key = "partition_year"
  date_format = "2006"

[[processors.date]]
  order = 4
  tag_key = "partition_month"
  date_format = "01"

[[processors.date]]
  order = 5
  tag_key = "partition_day"
  date_format = "02"

# Step 4: Custom transformations (compute derived fields, flatten tags)
[[processors.starlark]]
  order = 6
  source = '''
load("time", "time")

def apply(metric):
    # Compute total CPU usage
    if metric.name == "cpu_metrics":
        idle = metric.fields.get("cpu_idle_percent", 0.0)
        metric.fields["cpu_total_usage_percent"] = round(100.0 - idle, 2)

    # Memory health flag
    if metric.name == "memory_metrics":
        used = metric.fields.get("memory_used_percent", 0.0)
        metric.fields["memory_critical"] = used > 95.0

    # Flatten all tags into fields for columnar storage
    for key, value in metric.tags.items():
        if not key.startswith("partition_"):
            metric.fields["tag_" + key] = value

    # Add metadata
    metric.fields["measurement"] = metric.name
    metric.fields["source_system"] = "influxdb"
    metric.fields["pipeline_version"] = "1.0"
    metric.fields["ingested_at"] = int(time.now().unix_nano / 1000000000)

    return metric
'''

# =============================================================================
# OUTPUT: Write to S3 with Hive-style partitioning
# =============================================================================
[[outputs.s3]]
  bucket = "${AWS_S3_BUCKET}"
  s3_key_prefix = "landing-zone/measurement={{.Name}}/year={{.Tag \"partition_year\"}}/month={{.Tag \"partition_month\"}}/day={{.Tag \"partition_day\"}}/"

  region = "${AWS_REGION}"

  ## Authentication (uses environment variables or instance role)
  # access_key = "${AWS_ACCESS_KEY_ID}"
  # secret_key = "${AWS_SECRET_ACCESS_KEY}"

  data_format = "json"
  json_timestamp_units = "1s"

  ## Batching
  metric_batch_size = 10000
  metric_buffer_limit = 100000
  flush_interval = "5m"
  flush_jitter = "30s"

  use_batch_format = true

# =============================================================================
# MONITORING: Internal Telegraf metrics
# =============================================================================
[[inputs.internal]]
  collect_memstats = true
  name_prefix = "telegraf_pipeline_"

[[outputs.file]]
  files = ["/var/log/telegraf/internal_metrics.json"]
  data_format = "json"
  namepass = ["telegraf_pipeline_*"]
  rotation_interval = "24h"
  rotation_max_archives = 7

Set the required environment variables:

# /etc/default/telegraf or /etc/telegraf/telegraf.env
INFLUXDB_URL=http://localhost:8086
INFLUXDB_TOKEN=my-super-secret-token
INFLUXDB_ORG=my-org
INFLUXDB_BUCKET=metrics
AWS_S3_BUCKET=my-timeseries-lakehouse
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=AKIA...
AWS_SECRET_ACCESS_KEY=secret...

Start the pipeline:

# Test the configuration first
telegraf --config /etc/telegraf/telegraf-pipeline.conf --test

# Run in foreground for debugging
telegraf --config /etc/telegraf/telegraf-pipeline.conf

# Run as a service
sudo cp /etc/telegraf/telegraf-pipeline.conf /etc/telegraf/telegraf.conf
sudo systemctl restart telegraf
sudo systemctl status telegraf
sudo journalctl -u telegraf -f

Querying Iceberg Data with Athena

Once data is flowing into your Iceberg tables, you can query it with standard SQL through Amazon Athena. Here are practical queries you will use daily.

Basic Analytical Queries

-- Average CPU usage per host over the last 24 hours
SELECT
    hostname,
    region,
    AVG(cpu_total_usage_percent) as avg_cpu_usage,
    MAX(cpu_total_usage_percent) as peak_cpu_usage,
    MIN(cpu_idle_percent) as min_idle_percent,
    COUNT(*) as data_points
FROM timeseries_db.cpu_metrics
WHERE timestamp >= current_timestamp - interval '24' hour
GROUP BY hostname, region
ORDER BY avg_cpu_usage DESC;

-- Hourly aggregation for dashboarding
SELECT
    date_trunc('hour', timestamp) as hour,
    hostname,
    AVG(cpu_total_usage_percent) as avg_cpu,
    APPROX_PERCENTILE(cpu_total_usage_percent, 0.95) as p95_cpu,
    APPROX_PERCENTILE(cpu_total_usage_percent, 0.99) as p99_cpu
FROM timeseries_db.cpu_metrics
WHERE timestamp >= current_timestamp - interval '7' day
GROUP BY 1, 2
ORDER BY 1 DESC, 2;

-- Memory alerts: find hosts with high memory usage
SELECT
    hostname,
    region,
    timestamp,
    used_percent,
    available / (1024*1024*1024) as available_gb
FROM timeseries_db.memory_metrics
WHERE used_percent > 90
  AND timestamp >= current_timestamp - interval '1' hour
ORDER BY used_percent DESC;

Time Travel Queries

One of Iceberg’s killer features is time travel — querying your data as it existed at a previous point in time:

-- Query data as it existed yesterday at noon
SELECT *
FROM timeseries_db.cpu_metrics
FOR TIMESTAMP AS OF TIMESTAMP '2026-04-02 12:00:00'
WHERE hostname = 'server01';

-- Compare current data with data from a week ago
SELECT
    current_data.hostname,
    current_data.avg_cpu as current_avg_cpu,
    historical.avg_cpu as week_ago_avg_cpu,
    current_data.avg_cpu - historical.avg_cpu as cpu_change
FROM (
    SELECT hostname, AVG(cpu_total_usage_percent) as avg_cpu
    FROM timeseries_db.cpu_metrics
    WHERE timestamp >= current_timestamp - interval '1' day
    GROUP BY hostname
) current_data
JOIN (
    SELECT hostname, AVG(cpu_total_usage_percent) as avg_cpu
    FROM timeseries_db.cpu_metrics
    FOR TIMESTAMP AS OF TIMESTAMP '2026-03-27 00:00:00'
    WHERE timestamp >= TIMESTAMP '2026-03-26' AND timestamp < TIMESTAMP '2026-03-27'
    GROUP BY hostname
) historical ON current_data.hostname = historical.hostname;

-- View table snapshot history
SELECT * FROM timeseries_db.cpu_metrics$snapshots ORDER BY committed_at DESC LIMIT 10;

-- View manifest files
SELECT * FROM timeseries_db.cpu_metrics$manifests;

Joining with Other Data Sources

-- Join CPU metrics with a server inventory table
SELECT
    c.hostname,
    c.region,
    s.instance_type,
    s.team,
    AVG(c.cpu_total_usage_percent) as avg_cpu,
    s.monthly_cost
FROM timeseries_db.cpu_metrics c
JOIN timeseries_db.server_inventory s ON c.hostname = s.hostname
WHERE c.timestamp >= current_timestamp - interval '7' day
GROUP BY c.hostname, c.region, s.instance_type, s.team, s.monthly_cost
HAVING AVG(c.cpu_total_usage_percent) < 10  -- Underutilized servers
ORDER BY s.monthly_cost DESC;

Athena Cost Optimization Tips

Tip: Athena charges $5 per TB of data scanned. With Iceberg's partition pruning and Parquet's columnar storage, you can reduce costs by 90% or more compared to scanning raw JSON files. Always include partition columns in your WHERE clause, and SELECT only the columns you need — never use SELECT * on large tables.
  • Use partition predicates: WHERE timestamp >= ... triggers Iceberg partition pruning, scanning only relevant Parquet files.
  • Select specific columns: Parquet is columnar, so SELECT hostname, cpu_total_usage_percent reads far less data than SELECT *.
  • Run compaction regularly: Small files degrade query performance and increase cost. Keep files between 128MB and 256MB.
  • Use CTAS for frequent queries: Materialize expensive queries as new Iceberg tables.

Alternative Pipeline: InfluxDB to Telegraf to Kafka to Spark to Iceberg

For organizations that need true streaming ingestion with exactly-once semantics, a Kafka-based pipeline is the way to go. Here's the architecture.

InfluxDBTelegrafKafka TopicSpark Structured StreamingIceberg Table

When to Use Kafka vs S3-Based

  • Use S3-based (this guide's main approach) when: batch is acceptable (minutes to hours), data volume is under 1TB/day, you want minimal infrastructure, cost is a priority.
  • Use Kafka-based when: you need sub-minute latency, data volume exceeds 1TB/day, you already have a Kafka cluster, you need exactly-once delivery guarantees.

Telegraf Kafka Output Configuration

# telegraf.conf - Output: Kafka
[[outputs.kafka]]
  ## Kafka broker addresses
  brokers = ["kafka-broker-1:9092", "kafka-broker-2:9092", "kafka-broker-3:9092"]

  ## Topic for all metrics (or use topic_suffix for per-measurement topics)
  topic = "influxdb-metrics"

  ## Use measurement name as topic suffix for separate topics
  ## Creates topics like: influxdb-metrics-cpu_metrics, influxdb-metrics-memory_metrics
  # topic_suffix = {method = "measurement"}

  ## Compression
  compression_codec = "snappy"

  ## Required acks: 0=none, 1=leader, -1=all replicas
  required_acks = -1

  ## Max message size
  max_message_bytes = 1048576

  ## Data format
  data_format = "json"
  json_timestamp_units = "1ms"

  ## SASL authentication (if Kafka requires it)
  # sasl_mechanism = "SCRAM-SHA-512"
  # sasl_username = "${KAFKA_USERNAME}"
  # sasl_password = "${KAFKA_PASSWORD}"

  ## TLS
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"

The Spark Structured Streaming consumer:

# spark_kafka_iceberg.py - Spark Structured Streaming from Kafka to Iceberg
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("Kafka-to-Iceberg-Streaming") \
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://my-timeseries-lakehouse/iceberg-warehouse/") \
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .getOrCreate()

# Define the schema matching our Telegraf JSON output
metrics_schema = StructType([
    StructField("name", StringType()),
    StructField("timestamp", LongType()),
    StructField("tags", MapType(StringType(), StringType())),
    StructField("fields", MapType(StringType(), DoubleType()))
])

# Read from Kafka
df_kafka = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-broker-1:9092") \
    .option("subscribe", "influxdb-metrics") \
    .option("startingOffsets", "latest") \
    .load()

# Parse JSON messages
df_parsed = df_kafka \
    .select(from_json(col("value").cast("string"), metrics_schema).alias("data")) \
    .select("data.*") \
    .withColumn("timestamp", to_timestamp(col("timestamp").cast("long"))) \
    .withColumn("hostname", col("tags")["hostname"]) \
    .withColumn("region", col("tags")["region"])

# Write to Iceberg using foreachBatch
def write_to_iceberg(batch_df, batch_id):
    batch_df.writeTo("glue_catalog.timeseries_db.all_metrics") \
        .option("merge-schema", "true") \
        .append()

query = df_parsed.writeStream \
    .foreachBatch(write_to_iceberg) \
    .option("checkpointLocation", "s3://my-timeseries-lakehouse/checkpoints/kafka-iceberg/") \
    .trigger(processingTime="1 minute") \
    .start()

query.awaitTermination()

Monitoring and Troubleshooting

A data pipeline is only as good as its monitoring. Here's how to keep this pipeline healthy.

Telegraf Internal Metrics

The inputs.internal plugin we configured earlier provides critical operational metrics:

# Check Telegraf metrics buffer status
cat /var/log/telegraf/internal_metrics.json | python3 -m json.tool | grep -E "metrics_gathered|metrics_written|buffer_size"

# Key metrics to monitor:
# - gather_errors: input plugin failures (InfluxDB connection issues)
# - metrics_gathered: total metrics collected per interval
# - metrics_written: total metrics sent to S3
# - buffer_size: current buffer usage (should stay well below buffer_limit)
# - write_errors: output plugin failures (S3 permission or network issues)

Common Issues and Resolutions

Issue Symptoms Resolution
InfluxDB connection failure gather_errors increasing, no new metrics Verify InfluxDB URL and token. Check network connectivity. Ensure InfluxDB is running.
S3 permission denied write_errors increasing, AccessDenied in logs Check IAM policy. Verify AWS credentials. Ensure bucket policy allows PutObject.
Schema mismatch in Glue Athena queries return NULL or fail Re-run Glue Crawler. Check that JSON field names match table column names. Verify type conversions in Telegraf processors.
Glue Crawler fails Crawler stuck in RUNNING or FAILED state Check Glue Crawler IAM role. Verify S3 path is correct. Look for malformed JSON files in landing zone.
Data type conflicts Fields showing as wrong type in Athena Use processors.converter to enforce types in Telegraf. InfluxDB may return integers as floats or vice versa.
Buffer overflow metrics_dropped count increasing Increase metric_buffer_limit. Reduce flush_interval. Check for S3 write latency issues.
Duplicate data in Iceberg Row counts higher than expected Implement idempotent ingestion with MERGE INTO instead of INSERT. Track processed files to avoid re-ingestion.
Too many small files Athena queries slow and expensive Increase Telegraf batch size. Run Iceberg compaction regularly. Target 128-256MB file sizes.

 

Data Validation Queries

-- Check data freshness: how recent is the latest data?
SELECT
    MAX(timestamp) as latest_data,
    current_timestamp as current_time,
    date_diff('minute', MAX(timestamp), current_timestamp) as minutes_behind
FROM timeseries_db.cpu_metrics;

-- Check for data gaps: are there any missing hours?
SELECT
    date_trunc('hour', timestamp) as hour,
    COUNT(*) as record_count
FROM timeseries_db.cpu_metrics
WHERE timestamp >= current_timestamp - interval '24' hour
GROUP BY 1
ORDER BY 1;

-- Validate data quality: check for NULLs and outliers
SELECT
    COUNT(*) as total_records,
    COUNT(hostname) as non_null_hostname,
    COUNT(cpu_total_usage_percent) as non_null_cpu,
    MIN(cpu_total_usage_percent) as min_cpu,
    MAX(cpu_total_usage_percent) as max_cpu,
    COUNT(CASE WHEN cpu_total_usage_percent > 100 THEN 1 END) as invalid_cpu_over_100,
    COUNT(CASE WHEN cpu_total_usage_percent < 0 THEN 1 END) as invalid_cpu_negative
FROM timeseries_db.cpu_metrics
WHERE timestamp >= current_timestamp - interval '1' hour;

Performance Optimization

Getting the pipeline working is one thing. Making it perform well at scale is another. Here are the key tuning parameters.

Telegraf Buffer Tuning

The two most important Telegraf settings are metric_batch_size and metric_buffer_limit:

  • metric_batch_size: How many metrics are sent to the output plugin at once. Larger batches reduce S3 API calls but increase memory usage and latency.
  • metric_buffer_limit: Maximum metrics held in memory. If the output is slow, metrics queue here. Once full, new metrics are dropped.
Setting Small (<10K metrics/min) Medium (10K-100K/min) Large (>100K/min)
metric_batch_size 5,000 10,000 50,000
metric_buffer_limit 50,000 200,000 1,000,000
flush_interval 10m 5m 1m
collection_interval 1h 15m 5m
Target S3 file size 64-128 MB 128-256 MB 256-512 MB
Partition granularity Day Day Hour
Telegraf RAM estimate 128 MB 512 MB 2-4 GB
Compaction frequency Daily Every 6 hours Every 1-2 hours

 

Iceberg Compaction

Small files are the enemy of Iceberg performance. Schedule compaction to merge small files:

-- Run compaction via Athena (Athena v3 with Iceberg support)
OPTIMIZE timeseries_db.cpu_metrics REWRITE DATA USING BIN_PACK;

-- Or via Spark (more control over target file size)
-- In a Glue ETL job or EMR Spark session:
CALL glue_catalog.system.rewrite_data_files(
    table => 'timeseries_db.cpu_metrics',
    options => map(
        'target-file-size-bytes', '134217728',  -- 128MB
        'min-file-size-bytes', '67108864',       -- 64MB
        'max-file-size-bytes', '268435456'       -- 256MB
    )
);

-- Expire old snapshots to reclaim storage
CALL glue_catalog.system.expire_snapshots(
    table => 'timeseries_db.cpu_metrics',
    older_than => TIMESTAMP '2026-03-01 00:00:00',
    retain_last => 10
);

-- Remove orphan files
CALL glue_catalog.system.remove_orphan_files(
    table => 'timeseries_db.cpu_metrics',
    older_than => TIMESTAMP '2026-03-01 00:00:00'
);

Partitioning Best Practices for Time-Series Data

  • Partition by day for most workloads. This creates a manageable number of partitions and files.
  • Add a secondary partition on high-cardinality dimensions like measurement if you query specific measurements frequently.
  • Avoid over-partitioning. Partitioning by minute creates millions of tiny files that destroy performance.
  • Use Iceberg's hidden partitioning with day(timestamp) rather than creating explicit partition columns. This means queries on timestamp automatically trigger partition pruning without users needing to know about partitions.
  • Monitor partition sizes. If any partition has fewer than 10 files or each file is under 10MB, your partitioning is too granular.

Cost Analysis

Let's look at the real numbers. The cost savings from moving time-series data from InfluxDB to Iceberg on S3 can be dramatic, especially at scale.

Data Volume InfluxDB Cloud (storage + queries) S3 + Iceberg + Athena Monthly Savings
100 GB ~$200/mo (storage) + ~$50/mo (queries) ~$2.30 (S3) + ~$5 (Athena) + ~$10 (Glue) ~$233/mo (93% savings)
1 TB ~$2,000/mo + ~$200/mo ~$23 (S3) + ~$25 (Athena) + ~$20 (Glue) ~$2,132/mo (97% savings)
10 TB ~$20,000/mo + ~$500/mo ~$230 (S3) + ~$100 (Athena) + ~$50 (Glue) ~$20,120/mo (98% savings)

 

Caution: These cost estimates are approximations based on published pricing as of early 2026. InfluxDB Cloud costs vary by plan and usage patterns. Athena costs depend on query frequency and data scanned (Parquet with partition pruning dramatically reduces scan costs). Self-hosted InfluxDB costs depend on your infrastructure. Always run your own cost analysis with your actual workload patterns before making migration decisions.

Additional costs to factor in:

  • Telegraf compute: Runs on existing infrastructure. Minimal CPU and RAM for most workloads.
  • S3 API costs: PUT requests at $0.005 per 1,000. With batching, this is typically under $10/month.
  • Glue Crawler: $0.44 per DPU-hour. A daily crawl typically costs $1-5/month.
  • Glue ETL: $0.44 per DPU-hour. A daily 10-minute job with 2 DPUs costs ~$13/month.
  • Data transfer: Free within the same AWS region. Cross-region adds $0.02/GB.

The break-even point is almost immediate. Even at 100GB, you save over $230/month by moving to S3+Iceberg. The pipeline infrastructure (Telegraf, Glue) costs less than $30/month for most workloads.

Conclusion

Building a data pipeline from InfluxDB to Apache Iceberg through Telegraf is not just technically feasible — it is a compelling architecture that solves real problems. You get to keep InfluxDB doing what it does best (real-time monitoring and dashboards) while offloading historical data to a lakehouse that costs 90-98% less and opens up SQL analytics, ML pipelines, and proper data governance.

Let's recap what we built:

  • Telegraf input plugins that pull data from InfluxDB v1.x or v2.x using four different methods, from simple pull-based queries to real-time push-based listeners.
  • Telegraf processors that transform InfluxDB's tag/field model into a flat columnar schema suitable for Iceberg, with type conversion, field renaming, computed fields, and date partitioning.
  • S3 output with Hive-style partitioning that lands data in formats AWS Glue can discover and catalog.
  • Iceberg table creation via Athena DDL or Glue Crawlers, with proper partitioning for time-series workloads.
  • Automated ingestion using Glue ETL jobs, Athena INSERT INTO, Lambda triggers, or Spark on EMR.
  • A complete, production-ready telegraf.conf that you can deploy today with minimal modifications.

The beauty of this architecture is its modularity. You can start simple — JSON files on S3 with a Glue Crawler — and evolve to Parquet with Spark streaming as your needs grow. Telegraf's plugin architecture means you can swap inputs and outputs without rewriting your transformation logic. And Iceberg's partition evolution means you can change your partitioning strategy without rewriting a single byte of historical data.

If you're sitting on terabytes of time-series data in InfluxDB and your storage bills keep climbing, this pipeline is your escape hatch. Set it up over a weekend, validate it with a week of dual-writing, and then start reducing your InfluxDB retention policies. Your future self — and your finance team — will thank you.

References

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *