Home Programming Complex Event Processing with Apache Flink: Building Real-Time CEP Pipelines from Scratch

Complex Event Processing with Apache Flink: Building Real-Time CEP Pipelines from Scratch

A single credit card gets swiped at a gas station in Houston at 2:13 PM. Forty seconds later, the same card number appears at an electronics store in Tokyo. Within those forty seconds, your system needs to ingest both events, correlate them across millions of concurrent transaction streams, recognize the physical impossibility, and fire a fraud alert — all before the Tokyo merchant finishes printing the receipt. This is not a hypothetical scenario. Visa processes over 65,000 transactions per second at peak, and fraudsters are getting faster every year. Traditional batch jobs that run overnight are worthless here. You need Complex Event Processing, and Apache Flink is the best engine to build it on.

In this guide, we are going to build real-time CEP pipelines from scratch. Not toy examples — complete, compilable Java code that you can adapt for production fraud detection, IoT monitoring, and financial market analysis. By the end, you will understand Flink’s CEP library deeply enough to design your own pattern-matching pipelines for any domain.

What is Complex Event Processing (CEP)?

Complex Event Processing is a methodology for detecting meaningful patterns across streams of events in real time. The key word is patterns. Simple stream processing might filter or transform individual events — “give me all transactions over $1,000.” CEP goes further: it looks for sequences, combinations, and temporal relationships between multiple events.

Simple Events vs Complex Events

A simple event is a single, atomic occurrence: a temperature reading, a stock trade, a log entry. A complex event is a higher-level pattern derived from multiple simple events. For example:

  • Simple event: “User #4821 made a $50 purchase at Starbucks.”
  • Complex event: “User #4821 made three purchases totaling over $2,000 within five minutes from three different countries.” This complex event only exists because a CEP engine recognized the pattern across the simple events.

CEP vs Traditional Processing

Understanding where CEP fits relative to batch and stream processing is crucial:

Feature Batch Processing Stream Processing CEP
Latency Minutes to hours Milliseconds to seconds Milliseconds to seconds
Data Model Bounded datasets Unbounded streams Unbounded streams with pattern state
Pattern Detection Post-hoc analysis Per-event transformations Multi-event temporal patterns
State Management Minimal (reprocess from scratch) Windowed aggregations Pattern match buffers with NFA
Use Case Example Monthly reports Real-time dashboards Fraud detection, anomaly sequences
Tools Spark, Hadoop MapReduce Kafka Streams, Flink DataStream Flink CEP, Esper, Siddhi

 

Real-World CEP Applications

CEP is not a niche technology. It powers some of the most critical systems in the world:

  • Fraud Detection: Banks and payment processors use CEP to catch fraudulent transaction patterns in real time — velocity checks, geographic impossibility, unusual merchant categories.
  • IoT Monitoring: Manufacturing plants and smart buildings use CEP to detect equipment failure sequences before catastrophic breakdowns occur.
  • Algorithmic Trading: Hedge funds detect price-volume patterns across multiple securities within microsecond windows to trigger automated trades.
  • Network Security: SIEM platforms use CEP to correlate firewall logs, authentication events, and data transfer patterns to detect multi-stage cyberattacks.
  • Supply Chain: Real-time tracking of shipment events to detect delays, rerouting needs, or customs anomalies before they cascade.

There are several stream processing engines on the market, but Flink stands apart for CEP workloads. Here is why.

Flink was designed from the ground up as a streaming-first engine. Unlike Spark, which bolted streaming onto a batch framework, Flink treats streams as the fundamental data model. This matters enormously for CEP because:

  • DataStream API: Flink’s core API operates on unbounded streams, giving you fine-grained control over event processing, keying, and windowing.
  • Event Time Processing: Flink natively supports event time semantics with watermarks, which is essential for CEP. When you are matching patterns across events, you need to reason about when events actually happened, not when they arrived at your system.
  • Watermarks: Flink’s watermark mechanism tracks the progress of event time through the stream, enabling correct handling of out-of-order events — a constant reality in distributed systems.
  • Flink CEP Library (flink-cep): Flink ships a dedicated CEP library that implements a Non-deterministic Finite Automaton (NFA) for pattern matching. You define patterns declaratively, and the engine handles the complex state management internally.
  • Exactly-Once Semantics: Flink’s checkpointing mechanism guarantees exactly-once processing, so your fraud alerts will never be duplicated or lost.
  • Low Latency: Flink processes events within milliseconds, not micro-batches. For CEP, where you need to match patterns as fast as possible, this is non-negotiable.
Feature Flink CEP Kafka Streams Esper Spark Structured Streaming Kinesis Analytics
Pattern Matching Built-in NFA-based Manual (no CEP library) EPL query language No native CEP SQL-based only
Latency True streaming (ms) True streaming (ms) In-memory (ms) Micro-batch (100ms+) Near real-time
Scalability Distributed cluster Embedded scaling Single JVM Distributed cluster AWS managed
Exactly-Once Yes Yes No Yes Yes
Fault Tolerance Checkpointing + savepoints Changelog topics Limited Checkpointing Managed snapshots
Event Time Support Native watermarks Timestamp extractors Limited Native watermarks Limited
Best For Complex temporal patterns at scale Simple event-driven microservices Prototyping, embedded CEP Batch + streaming hybrid AWS-native SQL analytics

 

Key Takeaway: If you need to detect complex temporal patterns across high-volume event streams with exactly-once guarantees, Flink CEP is the strongest choice. Kafka Streams is excellent for simpler event-driven architectures, but it lacks a built-in pattern matching engine. Esper has great CEP semantics but does not scale horizontally.

Setting Up Your Flink CEP Project

Prerequisites

Before we write any code, make sure you have:

  • Java 11 or 17 (Flink 1.18+ supports both; Java 17 is recommended for new projects)
  • Maven 3.8+ or Gradle 7+
  • An IDE — IntelliJ IDEA with the Flink plugin is ideal
  • Docker (optional, for running Kafka and Flink locally)

Project Structure

Here is the layout we will use throughout this guide:

flink-cep-pipeline/
├── pom.xml
├── src/main/java/com/example/cep/
│   ├── FlinkCEPApplication.java
│   ├── events/
│   │   ├── Transaction.java
│   │   ├── SensorReading.java
│   │   └── StockTick.java
│   ├── patterns/
│   │   ├── FraudPatterns.java
│   │   ├── IoTPatterns.java
│   │   └── StockPatterns.java
│   ├── processors/
│   │   ├── FraudAlertProcessor.java
│   │   ├── AnomalyAlertProcessor.java
│   │   └── TradingSignalProcessor.java
│   └── sources/
│       └── KafkaSourceBuilder.java
└── src/main/resources/
    └── log4j2.properties

Maven pom.xml

This is the complete Maven configuration with all the Flink CEP dependencies you need:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>flink-cep-pipeline</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <properties>
        <flink.version>1.18.1</flink.version>
        <java.version>17</java.version>
        <kafka.version>3.6.1</kafka.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Flink Core -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Flink CEP Library -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink Kafka Connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.1.0-1.18</version>
        </dependency>

        <!-- Flink JSON Format -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink Clients (for local execution) -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Jackson for JSON serialization -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.16.1</version>
        </dependency>

        <!-- SLF4J + Log4j2 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.22.1</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.22.1</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.22.1</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals><goal>shade</goal></goals>
                        <configuration>
                            <transformers>
                                <transformer implementation=
                                    "org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.example.cep.FlinkCEPApplication</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Gradle Alternative

If you prefer Gradle, here is the equivalent build.gradle.kts:

plugins {
    java
    id("com.github.johnrengelman.shadow") version "8.1.1"
}

java {
    sourceCompatibility = JavaVersion.VERSION_17
    targetCompatibility = JavaVersion.VERSION_17
}

val flinkVersion = "1.18.1"

dependencies {
    compileOnly("org.apache.flink:flink-streaming-java:$flinkVersion")
    compileOnly("org.apache.flink:flink-clients:$flinkVersion")
    implementation("org.apache.flink:flink-cep:$flinkVersion")
    implementation("org.apache.flink:flink-connector-kafka:3.1.0-1.18")
    implementation("org.apache.flink:flink-json:$flinkVersion")
    implementation("com.fasterxml.jackson.core:jackson-databind:2.16.1")
    runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.22.1")
    runtimeOnly("org.apache.logging.log4j:log4j-core:2.22.1")
}
Tip: The flink-streaming-java and flink-clients dependencies are marked as provided (Maven) or compileOnly (Gradle) because the Flink cluster already includes them. When running locally in your IDE, add them to your run configuration’s classpath.

Understanding Flink CEP Pattern API

The Flink CEP library gives you a declarative API to define event patterns. Under the hood, it compiles your pattern definition into a Non-deterministic Finite Automaton (NFA) that efficiently matches patterns against the incoming event stream. Let us walk through every major concept.

Pattern Basics

Every pattern starts with Pattern.begin() and chains additional states:

// Strict contiguity: events must be directly adjacent
Pattern<Event, ?> strict = Pattern.<Event>begin("start")
    .where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event event) {
            return event.getType().equals("login_failed");
        }
    })
    .next("second")  // MUST be the very next event
    .where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event event) {
            return event.getType().equals("login_failed");
        }
    })
    .next("third")
    .where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event event) {
            return event.getType().equals("login_failed");
        }
    });

// Relaxed contiguity: allows non-matching events in between
Pattern<Event, ?> relaxed = Pattern.<Event>begin("start")
    .where(/* ... */)
    .followedBy("end")  // matching events can have other events between them
    .where(/* ... */);

// Non-deterministic relaxed contiguity:
// matches all possible combinations
Pattern<Event, ?> nonDeterministic = Pattern.<Event>begin("start")
    .where(/* ... */)
    .followedByAny("end")  // considers ALL matching events, not just first
    .where(/* ... */);

Contiguity: Strict, Relaxed, Non-Deterministic

This is one of the most important concepts in Flink CEP. Suppose you have the event stream: A, C, B1, B2 and your pattern is “A followed by B”:

  • next() — Strict: No match. C appears between A and B1, breaking strict contiguity.
  • followedBy() — Relaxed: Matches {A, B1}. Skips C, takes the first matching B.
  • followedByAny() — Non-deterministic relaxed: Matches {A, B1} AND {A, B2}. Considers all possible matching events.

Quantifiers

// Exactly N times
Pattern<Event, ?> exactly3 = Pattern.<Event>begin("failures")
    .where(condition)
    .times(3);  // exactly 3 matching events

// N or more times
Pattern<Event, ?> atLeast3 = Pattern.<Event>begin("failures")
    .where(condition)
    .timesOrMore(3);  // 3 or more matching events

// Range
Pattern<Event, ?> range = Pattern.<Event>begin("failures")
    .where(condition)
    .times(2, 5);  // between 2 and 5 matching events

// One or more (greedy)
Pattern<Event, ?> oneOrMore = Pattern.<Event>begin("failures")
    .where(condition)
    .oneOrMore()
    .greedy();  // match as many as possible

// Optional
Pattern<Event, ?> withOptional = Pattern.<Event>begin("start")
    .where(startCondition)
    .next("middle")
    .where(middleCondition)
    .optional()  // this state may or may not match
    .next("end")
    .where(endCondition);

Conditions

// Simple condition — checks current event only
.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event event) {
        return event.getAmount() > 1000.0;
    }
})

// Iterative condition — can reference previously matched events
.where(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event event, Context<Event> ctx) {
        // Compare with previously matched event
        for (Event prev : ctx.getEventsForPattern("start")) {
            if (!event.getLocation().equals(prev.getLocation())) {
                return true;  // different location than start event
            }
        }
        return false;
    }
})

// OR condition
.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event event) {
        return event.getType().equals("withdrawal");
    }
})
.or(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event event) {
        return event.getType().equals("transfer");
    }
})

// Until condition (stop condition for looping patterns)
.oneOrMore()
.until(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event event) {
        return event.getType().equals("logout");
    }
})

Time Constraints

// The entire pattern must complete within 5 minutes
Pattern<Event, ?> timedPattern = Pattern.<Event>begin("first")
    .where(/* ... */)
    .followedBy("second")
    .where(/* ... */)
    .followedBy("third")
    .where(/* ... */)
    .within(Time.minutes(5));
Caution: The within() constraint applies to the entire pattern, measured from the first matching event. If the first event matches at T=0 and you set within(Time.minutes(5)), the entire pattern must complete before T=5min. Partially matched patterns that time out are discarded (or can be captured via timeout handling, which we will cover later).

Hands-On: Credit Card Fraud Detection Pipeline

Let us build our first complete CEP pipeline — a credit card fraud detection system. This is the classic CEP use case, and we will implement three different fraud patterns.

The Transaction Event Class

package com.example.cep.events;

public class Transaction implements java.io.Serializable {
    private String transactionId;
    private String userId;
    private double amount;
    private long timestamp;
    private String location;
    private String merchantCategory;
    private String cardNumber;

    // Default constructor for serialization
    public Transaction() {}

    public Transaction(String transactionId, String userId, double amount,
                       long timestamp, String location, String merchantCategory,
                       String cardNumber) {
        this.transactionId = transactionId;
        this.userId = userId;
        this.amount = amount;
        this.timestamp = timestamp;
        this.location = location;
        this.merchantCategory = merchantCategory;
        this.cardNumber = cardNumber;
    }

    // Getters and setters
    public String getTransactionId() { return transactionId; }
    public void setTransactionId(String transactionId) { this.transactionId = transactionId; }
    public String getUserId() { return userId; }
    public void setUserId(String userId) { this.userId = userId; }
    public double getAmount() { return amount; }
    public void setAmount(double amount) { this.amount = amount; }
    public long getTimestamp() { return timestamp; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
    public String getLocation() { return location; }
    public void setLocation(String location) { this.location = location; }
    public String getMerchantCategory() { return merchantCategory; }
    public void setMerchantCategory(String mc) { this.merchantCategory = mc; }
    public String getCardNumber() { return cardNumber; }
    public void setCardNumber(String cardNumber) { this.cardNumber = cardNumber; }

    @Override
    public String toString() {
        return String.format("Transaction{id=%s, user=%s, amount=%.2f, loc=%s, time=%d}",
            transactionId, userId, amount, location, timestamp);
    }
}

The Fraud Alert Class

package com.example.cep.events;

import java.util.List;

public class FraudAlert implements java.io.Serializable {
    private String alertId;
    private String userId;
    private String patternType;
    private String description;
    private List<Transaction> matchedTransactions;
    private long detectedAt;

    public FraudAlert(String alertId, String userId, String patternType,
                      String description, List<Transaction> matchedTransactions) {
        this.alertId = alertId;
        this.userId = userId;
        this.patternType = patternType;
        this.description = description;
        this.matchedTransactions = matchedTransactions;
        this.detectedAt = System.currentTimeMillis();
    }

    // Getters
    public String getAlertId() { return alertId; }
    public String getUserId() { return userId; }
    public String getPatternType() { return patternType; }
    public String getDescription() { return description; }
    public List<Transaction> getMatchedTransactions() { return matchedTransactions; }
    public long getDetectedAt() { return detectedAt; }

    @Override
    public String toString() {
        return String.format("FRAUD ALERT [%s] User: %s | Pattern: %s | %s | Transactions: %d",
            alertId, userId, patternType, description, matchedTransactions.size());
    }
}

Defining Fraud Patterns

Now the interesting part. We will define three fraud detection patterns:

package com.example.cep.patterns;

import com.example.cep.events.Transaction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.windowing.time.Time;

public class FraudPatterns {

    /**
     * Pattern 1: Geographic Impossibility
     * Three transactions over $500 within 5 minutes from different locations.
     * If a user is spending in New York, then London, then Tokyo within 5 minutes,
     * something is very wrong.
     */
    public static Pattern<Transaction, ?> geographicImpossibility() {
        return Pattern.<Transaction>begin("first")
            .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction tx) {
                    return tx.getAmount() > 500.0;
                }
            })
            .followedBy("second")
            .where(new IterativeCondition<Transaction>() {
                @Override
                public boolean filter(Transaction tx, Context<Transaction> ctx) {
                    if (tx.getAmount() <= 500.0) return false;
                    for (Transaction first : ctx.getEventsForPattern("first")) {
                        if (!tx.getLocation().equals(first.getLocation())) {
                            return true;
                        }
                    }
                    return false;
                }
            })
            .followedBy("third")
            .where(new IterativeCondition<Transaction>() {
                @Override
                public boolean filter(Transaction tx, Context<Transaction> ctx) {
                    if (tx.getAmount() <= 500.0) return false;
                    for (Transaction first : ctx.getEventsForPattern("first")) {
                        for (Transaction second : ctx.getEventsForPattern("second")) {
                            if (!tx.getLocation().equals(first.getLocation())
                                && !tx.getLocation().equals(second.getLocation())) {
                                return true;
                            }
                        }
                    }
                    return false;
                }
            })
            .within(Time.minutes(5));
    }

    /**
     * Pattern 2: Card Testing Attack
     * A small "test" transaction ($0.01–$5.00) followed by a large transaction
     * ($1000+) within 1 minute. Fraudsters often test stolen cards with tiny
     * purchases before going big.
     */
    public static Pattern<Transaction, ?> cardTestingAttack() {
        return Pattern.<Transaction>begin("test_charge")
            .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction tx) {
                    return tx.getAmount() >= 0.01 && tx.getAmount() <= 5.0;
                }
            })
            .followedBy("big_charge")
            .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction tx) {
                    return tx.getAmount() >= 1000.0;
                }
            })
            .within(Time.minutes(1));
    }

    /**
     * Pattern 3: Transaction Velocity
     * More than 5 transactions within 2 minutes. Even legitimate users
     * rarely make this many purchases in such a short time.
     */
    public static Pattern<Transaction, ?> highVelocity() {
        return Pattern.<Transaction>begin("transactions")
            .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction tx) {
                    return tx.getAmount() > 0;
                }
            })
            .timesOrMore(5)
            .within(Time.minutes(2));
    }
}

Processing Matched Patterns

package com.example.cep.processors;

import com.example.cep.events.FraudAlert;
import com.example.cep.events.Transaction;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.util.Collector;

import java.util.*;

public class FraudAlertProcessor
        extends PatternProcessFunction<Transaction, FraudAlert> {

    private final String patternType;

    public FraudAlertProcessor(String patternType) {
        this.patternType = patternType;
    }

    @Override
    public void processMatch(Map<String, List<Transaction>> match,
                             Context ctx,
                             Collector<FraudAlert> out) {
        // Collect all matched transactions from all pattern states
        List<Transaction> allTransactions = new ArrayList<>();
        match.values().forEach(allTransactions::addAll);

        // Extract user ID from first transaction
        String userId = allTransactions.get(0).getUserId();

        // Build a description
        String description = buildDescription(match);

        // Generate alert
        String alertId = UUID.randomUUID().toString();
        FraudAlert alert = new FraudAlert(
            alertId, userId, patternType, description, allTransactions
        );

        out.collect(alert);
    }

    private String buildDescription(Map<String, List<Transaction>> match) {
        StringBuilder sb = new StringBuilder();
        sb.append("Matched pattern '").append(patternType).append("': ");

        double total = 0;
        Set<String> locations = new HashSet<>();
        int count = 0;

        for (List<Transaction> txList : match.values()) {
            for (Transaction tx : txList) {
                total += tx.getAmount();
                locations.add(tx.getLocation());
                count++;
            }
        }

        sb.append(count).append(" transactions, ");
        sb.append(String.format("total $%.2f, ", total));
        sb.append("locations: ").append(locations);

        return sb.toString();
    }
}

The Complete Fraud Detection Pipeline

Here is the entire pipeline wired together — from Kafka source to fraud alert output:

package com.example.cep;

import com.example.cep.events.FraudAlert;
import com.example.cep.events.Transaction;
import com.example.cep.patterns.FraudPatterns;
import com.example.cep.processors.FraudAlertProcessor;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.fasterxml.jackson.databind.ObjectMapper;

import java.time.Duration;

public class FraudDetectionPipeline {

    public static void main(String[] args) throws Exception {
        // 1. Set up the streaming execution environment
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        // Enable checkpointing for exactly-once semantics
        env.enableCheckpointing(60_000); // checkpoint every 60 seconds

        // 2. Create Kafka source for transactions
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setTopics("transactions")
            .setGroupId("fraud-detection-group")
            .setStartingOffsets(OffsetsInitializer.latest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();

        // 3. Read from Kafka with event time watermarks
        ObjectMapper mapper = new ObjectMapper();

        DataStream<Transaction> transactions = env
            .fromSource(kafkaSource, WatermarkStrategy
                .<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((event, timestamp) -> {
                    try {
                        return mapper.readValue(event, Transaction.class)
                            .getTimestamp();
                    } catch (Exception e) {
                        return timestamp;
                    }
                }), "Kafka Transactions")
            .map(json -> mapper.readValue(json, Transaction.class))
            .keyBy(Transaction::getUserId);  // Key by user for per-user patterns

        // 4. Apply Pattern 1: Geographic Impossibility
        Pattern<Transaction, ?> geoPattern = FraudPatterns.geographicImpossibility();
        PatternStream<Transaction> geoPatternStream = CEP.pattern(
            transactions, geoPattern);

        DataStream<FraudAlert> geoAlerts = geoPatternStream.process(
            new FraudAlertProcessor("GEOGRAPHIC_IMPOSSIBILITY"));

        // 5. Apply Pattern 2: Card Testing Attack
        Pattern<Transaction, ?> testPattern = FraudPatterns.cardTestingAttack();
        PatternStream<Transaction> testPatternStream = CEP.pattern(
            transactions, testPattern);

        DataStream<FraudAlert> testAlerts = testPatternStream.process(
            new FraudAlertProcessor("CARD_TESTING_ATTACK"));

        // 6. Apply Pattern 3: High Velocity
        Pattern<Transaction, ?> velocityPattern = FraudPatterns.highVelocity();
        PatternStream<Transaction> velocityPatternStream = CEP.pattern(
            transactions, velocityPattern);

        DataStream<FraudAlert> velocityAlerts = velocityPatternStream.process(
            new FraudAlertProcessor("HIGH_VELOCITY"));

        // 7. Union all alerts and sink to Kafka
        DataStream<FraudAlert> allAlerts = geoAlerts
            .union(testAlerts)
            .union(velocityAlerts);

        // Print to console (for development)
        allAlerts.print("FRAUD ALERT");

        // Sink to Kafka alerts topic
        KafkaSink<String> alertSink = KafkaSink.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setRecordSerializer(
                KafkaRecordSerializationSchema.builder()
                    .setTopic("fraud-alerts")
                    .setValueSerializationSchema(new SimpleStringSchema())
                    .build()
            )
            .build();

        allAlerts
            .map(alert -> mapper.writeValueAsString(alert))
            .sinkTo(alertSink);

        // 8. Execute the pipeline
        env.execute("Credit Card Fraud Detection CEP Pipeline");
    }
}
Key Takeaway: Notice how we apply multiple independent patterns to the same keyed stream. Each CEP.pattern() call creates a separate NFA instance per key (per user), so patterns are evaluated independently and do not interfere with each other. The keyBy(Transaction::getUserId) call is critical — it ensures that patterns only match events belonging to the same user.

Hands-On: IoT Sensor Anomaly Detection

Our second pipeline detects anomalies in IoT sensor data. The pattern we want to catch: a sensor reports three consecutive rising temperature readings above a threshold within one minute, followed by a pressure drop. This sequence often indicates an impending equipment failure.

Sensor Event Class

package com.example.cep.events;

public class SensorReading implements java.io.Serializable {
    private String sensorId;
    private double temperature;
    private double pressure;
    private long timestamp;
    private String location;

    public SensorReading() {}

    public SensorReading(String sensorId, double temperature, double pressure,
                         long timestamp, String location) {
        this.sensorId = sensorId;
        this.temperature = temperature;
        this.pressure = pressure;
        this.timestamp = timestamp;
        this.location = location;
    }

    public String getSensorId() { return sensorId; }
    public void setSensorId(String sensorId) { this.sensorId = sensorId; }
    public double getTemperature() { return temperature; }
    public void setTemperature(double temperature) { this.temperature = temperature; }
    public double getPressure() { return pressure; }
    public void setPressure(double pressure) { this.pressure = pressure; }
    public long getTimestamp() { return timestamp; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
    public String getLocation() { return location; }
    public void setLocation(String location) { this.location = location; }

    @Override
    public String toString() {
        return String.format("Sensor{id=%s, temp=%.1f, pressure=%.1f, time=%d}",
            sensorId, temperature, pressure, timestamp);
    }
}

Complete IoT Anomaly Pipeline

package com.example.cep;

import com.example.cep.events.SensorReading;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.*;

public class IoTAnomalyDetectionPipeline {

    private static final double TEMP_THRESHOLD = 85.0; // degrees Celsius
    private static final double PRESSURE_DROP_THRESHOLD = 10.0; // PSI

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.enableCheckpointing(30_000);

        // Simulated sensor data source (replace with Kafka in production)
        DataStream<SensorReading> sensorStream = env
            .addSource(new SimulatedSensorSource()) // your custom source
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                    .withTimestampAssigner((reading, ts) -> reading.getTimestamp())
            )
            .keyBy(SensorReading::getSensorId);

        // Pattern: 3 consecutive high-temp readings, then a pressure drop
        Pattern<SensorReading, ?> anomalyPattern = Pattern
            .<SensorReading>begin("rising_temp_1")
            .where(new SimpleCondition<SensorReading>() {
                @Override
                public boolean filter(SensorReading reading) {
                    return reading.getTemperature() > TEMP_THRESHOLD;
                }
            })
            .next("rising_temp_2")
            .where(new IterativeCondition<SensorReading>() {
                @Override
                public boolean filter(SensorReading reading,
                                      Context<SensorReading> ctx) {
                    if (reading.getTemperature() <= TEMP_THRESHOLD) return false;
                    for (SensorReading prev : ctx.getEventsForPattern("rising_temp_1")) {
                        return reading.getTemperature() > prev.getTemperature();
                    }
                    return false;
                }
            })
            .next("rising_temp_3")
            .where(new IterativeCondition<SensorReading>() {
                @Override
                public boolean filter(SensorReading reading,
                                      Context<SensorReading> ctx) {
                    if (reading.getTemperature() <= TEMP_THRESHOLD) return false;
                    for (SensorReading prev : ctx.getEventsForPattern("rising_temp_2")) {
                        return reading.getTemperature() > prev.getTemperature();
                    }
                    return false;
                }
            })
            .followedBy("pressure_drop")
            .where(new IterativeCondition<SensorReading>() {
                @Override
                public boolean filter(SensorReading reading,
                                      Context<SensorReading> ctx) {
                    for (SensorReading prev : ctx.getEventsForPattern("rising_temp_1")) {
                        double pressureDiff = prev.getPressure() - reading.getPressure();
                        return pressureDiff > PRESSURE_DROP_THRESHOLD;
                    }
                    return false;
                }
            })
            .within(Time.minutes(1));

        // Apply pattern and process matches
        PatternStream<SensorReading> patternStream =
            CEP.pattern(sensorStream, anomalyPattern);

        DataStream<String> anomalyAlerts = patternStream.process(
            new PatternProcessFunction<SensorReading, String>() {
                @Override
                public void processMatch(Map<String, List<SensorReading>> match,
                                         Context ctx,
                                         Collector<String> out) {
                    SensorReading first = match.get("rising_temp_1").get(0);
                    SensorReading second = match.get("rising_temp_2").get(0);
                    SensorReading third = match.get("rising_temp_3").get(0);
                    SensorReading drop = match.get("pressure_drop").get(0);

                    String alert = String.format(
                        "ANOMALY DETECTED | Sensor: %s | Location: %s | " +
                        "Temps: %.1f -> %.1f -> %.1f (threshold: %.1f) | " +
                        "Pressure drop: %.1f -> %.1f (delta: %.1f)",
                        first.getSensorId(), first.getLocation(),
                        first.getTemperature(), second.getTemperature(),
                        third.getTemperature(), TEMP_THRESHOLD,
                        first.getPressure(), drop.getPressure(),
                        first.getPressure() - drop.getPressure()
                    );

                    out.collect(alert);
                }
            }
        );

        anomalyAlerts.print("IOT ALERT");
        env.execute("IoT Sensor Anomaly Detection Pipeline");
    }
}
Tip: Notice we use next() (strict contiguity) for the three rising temperature readings — they must be consecutive. But we use followedBy() (relaxed) for the pressure drop, because other normal readings might occur between the temperature spike and the pressure change.

Hands-On: Stock Market Pattern Detection

Our third pipeline detects potential trading signals: a price drop greater than 5% followed by a high volume spike within 10 seconds. This pattern can indicate panic selling followed by institutional buying — a potential buy signal.

StockTick Event Class

package com.example.cep.events;

public class StockTick implements java.io.Serializable {
    private String symbol;
    private double price;
    private long volume;
    private long timestamp;
    private double previousClose;

    public StockTick() {}

    public StockTick(String symbol, double price, long volume,
                     long timestamp, double previousClose) {
        this.symbol = symbol;
        this.price = price;
        this.volume = volume;
        this.timestamp = timestamp;
        this.previousClose = previousClose;
    }

    public String getSymbol() { return symbol; }
    public void setSymbol(String symbol) { this.symbol = symbol; }
    public double getPrice() { return price; }
    public void setPrice(double price) { this.price = price; }
    public long getVolume() { return volume; }
    public void setVolume(long volume) { this.volume = volume; }
    public long getTimestamp() { return timestamp; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
    public double getPreviousClose() { return previousClose; }
    public void setPreviousClose(double pc) { this.previousClose = pc; }

    public double getPriceChangePercent() {
        if (previousClose == 0) return 0;
        return ((price - previousClose) / previousClose) * 100.0;
    }

    @Override
    public String toString() {
        return String.format("StockTick{sym=%s, price=%.2f, vol=%d, change=%.2f%%}",
            symbol, price, volume, getPriceChangePercent());
    }
}

Complete Stock Market Detection Pipeline

package com.example.cep;

import com.example.cep.events.StockTick;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.*;

public class StockPatternDetectionPipeline {

    private static final double PRICE_DROP_THRESHOLD = -5.0; // percent
    private static final double VOLUME_SPIKE_MULTIPLIER = 3.0; // 3x average

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.enableCheckpointing(10_000);

        // Assume a Kafka source producing StockTick JSON
        // (using simulated source for this example)
        DataStream<StockTick> tickStream = env
            .addSource(new SimulatedStockSource())
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<StockTick>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                    .withTimestampAssigner((tick, ts) -> tick.getTimestamp())
            )
            .keyBy(StockTick::getSymbol);

        // Pattern: Price drop > 5% followed by volume spike within 10 seconds
        Pattern<StockTick, ?> buySignalPattern = Pattern
            .<StockTick>begin("price_drop")
            .where(new SimpleCondition<StockTick>() {
                @Override
                public boolean filter(StockTick tick) {
                    return tick.getPriceChangePercent() < PRICE_DROP_THRESHOLD;
                }
            })
            .followedBy("volume_spike")
            .where(new IterativeCondition<StockTick>() {
                @Override
                public boolean filter(StockTick tick, Context<StockTick> ctx) {
                    for (StockTick drop : ctx.getEventsForPattern("price_drop")) {
                        // Volume must be at least 3x the volume during the drop
                        if (tick.getVolume() > drop.getVolume() * VOLUME_SPIKE_MULTIPLIER) {
                            return true;
                        }
                    }
                    return false;
                }
            })
            .within(Time.seconds(10));

        // Apply pattern
        PatternStream<StockTick> patternStream =
            CEP.pattern(tickStream, buySignalPattern);

        DataStream<String> signals = patternStream.process(
            new PatternProcessFunction<StockTick, String>() {
                @Override
                public void processMatch(Map<String, List<StockTick>> match,
                                         Context ctx,
                                         Collector<String> out) {
                    StockTick drop = match.get("price_drop").get(0);
                    StockTick spike = match.get("volume_spike").get(0);

                    String signal = String.format(
                        "BUY SIGNAL | %s | Drop: %.2f%% (price $%.2f) | " +
                        "Volume spike: %d -> %d (%.1fx) | " +
                        "Current price: $%.2f",
                        drop.getSymbol(),
                        drop.getPriceChangePercent(),
                        drop.getPrice(),
                        drop.getVolume(),
                        spike.getVolume(),
                        (double) spike.getVolume() / drop.getVolume(),
                        spike.getPrice()
                    );

                    out.collect(signal);
                }
            }
        );

        signals.print("TRADING SIGNAL");
        env.execute("Stock Market Pattern Detection Pipeline");
    }
}
Caution: This is an educational example of pattern detection, not investment advice. Real algorithmic trading systems incorporate far more signals, risk management, and regulatory safeguards. Do not trade based solely on a single CEP pattern.

Advanced CEP Techniques

Once you have the basics working, these advanced techniques will take your CEP pipelines to production quality.

Dynamic Patterns from External Configuration

Hardcoding patterns is fine for getting started, but production systems need to update rules without redeploying. One approach is loading pattern parameters from an external source:

// Load thresholds from a configuration source
public class DynamicFraudPatterns {

    public static Pattern<Transaction, ?> fromConfig(FraudRuleConfig config) {
        return Pattern.<Transaction>begin("test_charge")
            .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction tx) {
                    return tx.getAmount() >= config.getMinTestAmount()
                        && tx.getAmount() <= config.getMaxTestAmount();
                }
            })
            .followedBy("big_charge")
            .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction tx) {
                    return tx.getAmount() >= config.getLargeTransactionThreshold();
                }
            })
            .within(Time.minutes(config.getTimeWindowMinutes()));
    }
}

// Configuration POJO loaded from database, file, or broadcast stream
public class FraudRuleConfig implements java.io.Serializable {
    private double minTestAmount = 0.01;
    private double maxTestAmount = 5.0;
    private double largeTransactionThreshold = 1000.0;
    private int timeWindowMinutes = 1;

    // getters and setters...
}
Tip: For truly dynamic pattern updates without restarting the Flink job, consider using Flink’s Broadcast State to push new rule configurations to all parallel instances. The CEP library itself does not support changing patterns at runtime, but you can implement a custom operator that re-creates patterns when it receives new configurations via a broadcast stream.

Side Outputs for Timeout Handling

When a partial pattern match times out (the within() window expires before the pattern completes), you can capture these timed-out partial matches using TimedOutPartialMatchHandler:

import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.util.OutputTag;

public class FraudAlertWithTimeout
        extends PatternProcessFunction<Transaction, FraudAlert>
        implements TimedOutPartialMatchHandler<Transaction> {

    // Side output for timed-out partial matches
    public static final OutputTag<String> TIMEOUT_TAG =
        new OutputTag<String>("timed-out-patterns") {};

    @Override
    public void processMatch(Map<String, List<Transaction>> match,
                             Context ctx,
                             Collector<FraudAlert> out) {
        // Process fully matched pattern (same as before)
        // ...
    }

    @Override
    public void processTimedOutMatch(Map<String, List<Transaction>> match,
                                     Context ctx) {
        // A partial match timed out — log it for analysis
        StringBuilder sb = new StringBuilder("PARTIAL MATCH TIMEOUT: ");
        for (Map.Entry<String, List<Transaction>> entry : match.entrySet()) {
            sb.append(entry.getKey()).append("=")
              .append(entry.getValue().size()).append(" events; ");
        }

        // Output to side output
        ctx.output(TIMEOUT_TAG, sb.toString());
    }
}

// In your pipeline, capture the side output:
SingleOutputStreamOperator<FraudAlert> alerts = patternStream
    .process(new FraudAlertWithTimeout());

DataStream<String> timedOutPatterns = alerts
    .getSideOutput(FraudAlertWithTimeout.TIMEOUT_TAG);

timedOutPatterns.print("TIMEOUT");

Scaling CEP Jobs

CEP pattern matching is stateful — the NFA maintains partial match buffers per key. Here are the scaling considerations:

  • Key Partitioning: Always keyBy() your stream before applying CEP patterns. This ensures events for the same entity (user, sensor, stock symbol) go to the same parallel instance.
  • Parallelism: Set parallelism based on your key cardinality. If you have 10,000 users, a parallelism of 8-16 is usually sufficient. Flink distributes keys across parallel instances using hash partitioning.
  • State Size: Each active partial match consumes memory. If you have long time windows or high-cardinality patterns, monitor your state size carefully.
// Set different parallelism for different pipeline stages
DataStream<Transaction> transactions = env
    .fromSource(kafkaSource, watermarkStrategy, "source")
    .setParallelism(8)  // match Kafka partitions
    .map(json -> mapper.readValue(json, Transaction.class))
    .setParallelism(8)
    .keyBy(Transaction::getUserId);

// CEP pattern matching — can be different parallelism
PatternStream<Transaction> patternStream = CEP.pattern(
    transactions.setParallelism(16),  // more parallelism for CPU-heavy matching
    fraudPattern
);

State Management and Checkpointing

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;

// Configure robust checkpointing
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);

CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setMinPauseBetweenCheckpoints(30_000);
checkpointConfig.setCheckpointTimeout(120_000);
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkpointConfig.setTolerableCheckpointFailureNumber(3);

// Retain checkpoints on cancellation (for savepoint-like recovery)
checkpointConfig.setExternalizedCheckpointCleanup(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

Event Time vs Processing Time

This distinction is absolutely critical for CEP. Event time is when the event actually happened (embedded in the event data). Processing time is when your Flink operator processes the event. In a perfect world, these would be identical. In reality, events arrive late, out of order, and at variable rates.

Why Event Time Matters for CEP

Consider a fraud detection pattern: “three transactions within 5 minutes.” If transaction #2 arrives at your system 10 seconds late due to network congestion, processing time would see a gap that does not actually exist. Event time correctly identifies that the three transactions occurred within the 5-minute window, regardless of when they arrived.

Watermark Strategies

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;

// Strategy 1: Bounded out-of-orderness (most common)
// Assumes events can arrive up to 5 seconds late
WatermarkStrategy<Transaction> strategy1 = WatermarkStrategy
    .<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((tx, recordTimestamp) -> tx.getTimestamp());

// Strategy 2: Monotonous timestamps (events always in order)
// Only use if you can guarantee ordering
WatermarkStrategy<Transaction> strategy2 = WatermarkStrategy
    .<Transaction>forMonotonousTimestamps()
    .withTimestampAssigner((tx, recordTimestamp) -> tx.getTimestamp());

// Strategy 3: Custom watermark generator for complex scenarios
WatermarkStrategy<Transaction> strategy3 = WatermarkStrategy
    .<Transaction>forGenerator(context -> new WatermarkGenerator<Transaction>() {
        private long maxTimestamp = Long.MIN_VALUE;
        private static final long MAX_DELAY = 10_000L; // 10 seconds

        @Override
        public void onEvent(Transaction tx, long eventTimestamp,
                            WatermarkOutput output) {
            maxTimestamp = Math.max(maxTimestamp, tx.getTimestamp());
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            output.emitWatermark(
                new org.apache.flink.api.common.eventtime.Watermark(
                    maxTimestamp - MAX_DELAY
                )
            );
        }
    })
    .withTimestampAssigner((tx, recordTimestamp) -> tx.getTimestamp());
Key Takeaway: For most CEP applications, forBoundedOutOfOrderness() with a 5-10 second bound is the right choice. Set it too low and you will miss late events. Set it too high and your pattern matching will be delayed by that amount, since Flink cannot process an event time window until the watermark passes it.

Connecting to Real Data Sources

Kafka Source Connector

Most production CEP pipelines read from Apache Kafka. Here is a complete, production-ready Kafka source setup:

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import com.fasterxml.jackson.databind.ObjectMapper;

// Custom deserializer for Transaction events
public class TransactionDeserializer
        implements DeserializationSchema<Transaction> {

    private transient ObjectMapper mapper;

    @Override
    public Transaction deserialize(byte[] message) {
        if (mapper == null) mapper = new ObjectMapper();
        try {
            return mapper.readValue(message, Transaction.class);
        } catch (Exception e) {
            // Log and skip malformed events
            System.err.println("Failed to deserialize: " + new String(message));
            return null;
        }
    }

    @Override
    public boolean isEndOfStream(Transaction nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Transaction> getProducedType() {
        return TypeInformation.of(Transaction.class);
    }
}

// Build the Kafka source
KafkaSource<Transaction> source = KafkaSource.<Transaction>builder()
    .setBootstrapServers("kafka-broker-1:9092,kafka-broker-2:9092")
    .setTopics("transactions")
    .setGroupId("fraud-detection-v2")
    .setStartingOffsets(OffsetsInitializer.latest())
    .setValueOnlyDeserializer(new TransactionDeserializer())
    .setProperty("security.protocol", "SASL_SSL")
    .setProperty("sasl.mechanism", "PLAIN")
    .setProperty("sasl.jaas.config",
        "org.apache.kafka.common.security.plain.PlainLoginModule required " +
        "username=\"api-key\" password=\"api-secret\";")
    .build();

Kafka Sink for Alerts

import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

KafkaSink<String> alertSink = KafkaSink.<String>builder()
    .setBootstrapServers("kafka-broker-1:9092")
    .setRecordSerializer(
        KafkaRecordSerializationSchema.builder()
            .setTopic("fraud-alerts")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
    )
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("fraud-alert-sink")
    .build();

// Wire it up
allAlerts
    .map(alert -> mapper.writeValueAsString(alert))
    .sinkTo(alertSink);

JDBC Connector for Enrichment

You might want to enrich events with data from a database (for example, looking up a customer’s risk score before applying CEP patterns). Flink’s async I/O is ideal for this:

import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import java.util.concurrent.TimeUnit;

// Async enrichment function
public class CustomerEnrichment
        extends RichAsyncFunction<Transaction, EnrichedTransaction> {

    private transient DataSource dataSource;

    @Override
    public void open(Configuration parameters) {
        // Initialize connection pool
        dataSource = createConnectionPool();
    }

    @Override
    public void asyncInvoke(Transaction tx,
                            ResultFuture<EnrichedTransaction> resultFuture) {
        CompletableFuture.supplyAsync(() -> {
            try (Connection conn = dataSource.getConnection();
                 PreparedStatement stmt = conn.prepareStatement(
                     "SELECT risk_score, account_age FROM customers WHERE id = ?")) {
                stmt.setString(1, tx.getUserId());
                ResultSet rs = stmt.executeQuery();
                if (rs.next()) {
                    return new EnrichedTransaction(tx,
                        rs.getDouble("risk_score"),
                        rs.getInt("account_age"));
                }
                return new EnrichedTransaction(tx, 0.5, 0);
            } catch (Exception e) {
                return new EnrichedTransaction(tx, 0.5, 0);
            }
        }).thenAccept(result -> resultFuture.complete(
            Collections.singleton(result)));
    }
}

// Apply async enrichment before CEP
DataStream<EnrichedTransaction> enriched = AsyncDataStream
    .unorderedWait(
        transactionStream,
        new CustomerEnrichment(),
        30, TimeUnit.SECONDS, // timeout
        100 // max concurrent requests
    );

Flink also supports connectors for Apache Pulsar, Amazon Kinesis, and many other systems through its connector ecosystem. The setup is similar — define a source, assign watermarks, and feed the stream into your CEP patterns.

Deploying and Monitoring

Running Locally for Development

The simplest way to develop is running directly in your IDE. Flink will create a local mini-cluster:

// This works out of the box in your IDE
StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();
// Flink automatically creates a local mini-cluster

Docker Compose for Local Flink + Kafka

For integration testing, use this Docker Compose setup to run Flink and Kafka locally:

# docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.3
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.3
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

  flink-jobmanager:
    image: flink:1.18.1-java17
    ports:
      - "8081:8081"  # Flink Web UI
    command: jobmanager
    environment:
      FLINK_PROPERTIES: |
        jobmanager.rpc.address: flink-jobmanager
        state.backend: rocksdb
        state.checkpoints.dir: file:///tmp/flink-checkpoints
        state.savepoints.dir: file:///tmp/flink-savepoints

  flink-taskmanager:
    image: flink:1.18.1-java17
    depends_on:
      - flink-jobmanager
    command: taskmanager
    scale: 2  # Run 2 task managers
    environment:
      FLINK_PROPERTIES: |
        jobmanager.rpc.address: flink-jobmanager
        taskmanager.numberOfTaskSlots: 4
        taskmanager.memory.process.size: 2048m

Deploying to a Flink Cluster

Build your fat JAR and submit it to the cluster:

# Build the fat JAR
mvn clean package -DskipTests

# Submit to standalone cluster
./bin/flink run \
  -c com.example.cep.FraudDetectionPipeline \
  target/flink-cep-pipeline-1.0.0.jar

# Submit to YARN cluster
./bin/flink run -m yarn-cluster \
  -yn 4 \       # 4 TaskManagers
  -ys 8 \       # 8 slots per TaskManager
  -yjm 2048m \  # JobManager memory
  -ytm 4096m \  # TaskManager memory
  -c com.example.cep.FraudDetectionPipeline \
  target/flink-cep-pipeline-1.0.0.jar

# Submit to Kubernetes (using Flink Kubernetes Operator)
kubectl apply -f flink-cep-deployment.yaml

Monitoring Your Pipeline

The Flink Web UI (default port 8081) is your primary monitoring interface. Key metrics to watch:

  • Checkpoint Duration: If checkpoints take longer than your interval, you will see cascading delays. Keep checkpoint duration under 50% of the checkpoint interval.
  • Backpressure: When a downstream operator cannot keep up, backpressure propagates upstream. The Web UI shows this with color-coded task states — red means trouble.
  • Throughput (records/second): Monitor input and output rates for each operator. A sudden drop in output with constant input suggests a processing bottleneck.
  • State Size: CEP patterns maintain partial match buffers. Watch state size grow over time — unbounded growth indicates a pattern or key space issue.

Performance Optimization

Getting a CEP pipeline to work is one thing. Getting it to handle production volumes efficiently is another. Here are the key tuning levers.

Choosing the Right Parallelism

Parallelism controls how many parallel instances of each operator Flink runs. For CEP pipelines:

  • Source parallelism: Match the number of Kafka partitions. If your topic has 16 partitions, set source parallelism to 16.
  • CEP operator parallelism: This depends on your key cardinality and pattern complexity. Start with the same parallelism as your source, then increase if you see backpressure on the CEP operator.
  • Sink parallelism: Usually lower than CEP parallelism since alert volume is much lower than input volume.

State Backend Selection

State Backend State Size Speed Best For
HashMapStateBackend (Heap) Limited by JVM heap Fastest Small state, low latency requirements
EmbeddedRocksDBStateBackend Limited by disk Slower (disk I/O) Large state, long time windows

 

For CEP specifically: if your patterns have short time windows (seconds to minutes) and moderate key cardinality, the heap state backend is fine. For long time windows (hours) or millions of keys with active partial matches, RocksDB is safer.

Setting Fraud Detection IoT Monitoring Market Data
Parallelism 8–32 4–16 16–64
Checkpoint Interval 60s 30s 10s
State Backend RocksDB Heap or RocksDB Heap
Watermark Bound 5s 3s 1s
TaskManager Memory 4–8 GB 2–4 GB 8–16 GB
Serialization Avro or Protobuf Avro Protobuf (smallest size)

 

Serialization Considerations

Flink’s default Java serialization is slow and produces large state snapshots. For production CEP pipelines, register your event types with Flink’s type system or use efficient serialization:

// Register types for efficient serialization
env.getConfig().registerTypeWithKryoSerializer(
    Transaction.class, ProtobufSerializer.class);

// Or use Flink's POJO serialization (automatic for well-formed POJOs)
// Ensure your classes:
// 1. Have a no-arg constructor
// 2. Have public getters/setters for all fields
// 3. Implement Serializable

// For Avro serialization, use Flink's Avro format
// Add dependency: flink-avro
// Then use AvroDeserializationSchema:
import org.apache.flink.formats.avro.AvroDeserializationSchema;

KafkaSource<Transaction> avroSource = KafkaSource.<Transaction>builder()
    .setBootstrapServers("localhost:9092")
    .setTopics("transactions-avro")
    .setGroupId("fraud-detection")
    .setValueOnlyDeserializer(
        AvroDeserializationSchema.forSpecific(Transaction.class))
    .build();

Common Pitfalls and Troubleshooting

Here are the issues that trip up most developers when building Flink CEP pipelines:

Problem Cause Solution
Pattern never matches Events arrive out of order; within() window too tight; using next() when followedBy() is needed Check event ordering, increase time window, switch contiguity mode
Too many matches (false positives) Pattern conditions too loose; using followedByAny() generating combinatorial explosion Add tighter conditions, switch to followedBy(), shorten time window
OutOfMemoryError Large NFA state from long time windows, high key cardinality, or followedByAny() with oneOrMore() Switch to RocksDB state backend, shorten time windows, add until() conditions
Checkpoint failures State too large to snapshot within timeout; backpressure causing delays Increase checkpoint timeout, enable incremental checkpointing with RocksDB, reduce state size
Watermark stalling (no progress) One Kafka partition has no data — its watermark stays at Long.MIN_VALUE, blocking global watermark Use withIdleness(Duration.ofMinutes(1)) on watermark strategy
Duplicate alerts after restart Reprocessing events without checkpointed state Always restart from savepoint/checkpoint, enable exactly-once on sinks
ClassNotFoundException at runtime flink-cep not in the fat JAR; marked as provided by mistake Ensure flink-cep is not marked as provided — only flink-streaming-java and flink-clients should be

 

Fixing Watermark Stalling

This is one of the most frustrating issues. If one Kafka partition stops producing events, its watermark stays at negative infinity, which blocks the global watermark for the entire job. The fix is simple:

WatermarkStrategy<Transaction> strategy = WatermarkStrategy
    .<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((tx, ts) -> tx.getTimestamp())
    .withIdleness(Duration.ofMinutes(1));  // Mark source as idle after 1 min

Debugging Pattern Matches

When patterns are not matching as expected, add a pass-through select before your CEP to verify events are flowing and correctly keyed:

// Debug: print events as they enter the CEP operator
transactions
    .map(tx -> {
        System.out.println("CEP INPUT: " + tx);
        return tx;
    })
    .keyBy(Transaction::getUserId);

// Also: check that your conditions actually match
// by testing them in a unit test
@Test
public void testFraudCondition() {
    Transaction tx = new Transaction("1", "user1", 600.0,
        System.currentTimeMillis(), "NYC", "electronics", "1234");
    assertTrue(tx.getAmount() > 500.0);  // Verify condition logic
}

Conclusion

Complex Event Processing with Apache Flink gives you the ability to detect sophisticated patterns across millions of events per second with millisecond latency and exactly-once guarantees. We have covered a lot of ground in this guide — from the fundamentals of CEP and the Flink pattern API to three complete, production-style pipelines for fraud detection, IoT monitoring, and financial market analysis.

The key takeaways to remember:

  • Choose the right contiguity: next() for strict sequences, followedBy() for relaxed matching, and followedByAny() sparingly (it is expensive).
  • Always use event time with proper watermark strategies. Processing time will give you incorrect pattern matches in any real-world system where events arrive out of order.
  • Key your streams: CEP patterns should almost always be applied to keyed streams so patterns match within a logical entity (user, sensor, stock symbol).
  • Handle timeouts: Implement TimedOutPartialMatchHandler to capture and analyze partial matches that do not complete within the time window.
  • Monitor state size: CEP is inherently stateful. Use RocksDB for large state, keep time windows as short as possible, and watch for combinatorial explosion with non-deterministic patterns.
  • Start simple, iterate: Begin with a single pattern on a small data sample. Verify it works correctly before adding complexity or scaling up.

Flink’s CEP library is one of the most powerful pattern-matching engines available in the open-source ecosystem. With the patterns and techniques in this guide, you have everything you need to build your first production CEP pipeline. Start with the fraud detection example, adapt it to your domain, and scale from there.

References

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *