PySpark, from the ground up Lesson 51 / 60

Kafka source: the most common production ingest

How Spark reads from Kafka, the offset semantics, and the at-least-once vs exactly-once question.

If you stand up a real Structured Streaming job in production, there’s roughly a 90% chance the input is Kafka. Files-on-a-directory works for tutorials and the occasional log-rotation pipeline; sockets are a toy; the rate source is for benchmarking. Kafka is the durable, replayable, partitioned, multi-consumer event log that everyone actually builds streaming systems on top of. So that’s where this lesson lives.

I’m not going to teach you Kafka itself — there are entire books for that. The 30-second version: Kafka stores append-only logs called topics, each split into partitions for parallelism. Producers write messages, each getting a numeric offset within its partition. Consumers read messages by tracking which offsets they’ve processed, per partition. The log is durable (replicated across brokers) and replayable (offsets don’t get reused, so you can re-read history as long as it’s within the retention window). That’s the foundation. Spark plugs into it as a consumer.

The basic call

events = (spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092")
            .option("subscribe", "user-events")
            .option("startingOffsets", "latest")
            .load())

A few things to flag immediately:

  • The connector is format("kafka"). Available as org.apache.spark:spark-sql-kafka-0-10_2.13:<spark-version> on Maven. You don’t get it with default Spark — you need to add it via --packages or --jars. The 0-10 in the artifact name is the Kafka client API version, not the Kafka broker version; it works against any broker from 0.10 onward, which is anything you’d realistically be running in 2026.
  • kafka.bootstrap.servers is a comma-separated list of brokers to bootstrap from. Spark uses these to discover the cluster and negotiate partition assignments. You don’t need to list all brokers — two or three is fine for fault tolerance.
  • subscribe picks the topic(s). You can subscribe to multiple by comma-separating: "orders,refunds,returns". For pattern-based subscription, use subscribePattern with a regex: "events_.*". There’s also assign for manual partition selection, but you almost never want it — let Spark and Kafka coordinate.
  • startingOffsets is critical and we’ll get to it in a minute.

What the DataFrame looks like

Calling events.printSchema() reveals the columns Spark exposes from Kafka:

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
 |-- headers: array<struct<key:string,value:binary>> (nullable = true)

The interesting ones:

  • key and value are binary. Kafka doesn’t know or care what’s in your messages; it stores bytes. Spark surfaces those bytes faithfully. If you printed value as-is you’d see hex blobs. You have to deserialize.
  • topic, partition, offset identify exactly where this row came from in Kafka. Useful for debugging, lineage, and offset-aware sinks.
  • timestamp is the broker’s record timestamp — either when the producer wrote it or when the broker ingested it, depending on the topic’s message.timestamp.type config. This is not your event time. If your message body has its own event_ts field, that’s what you should use for windowing (lesson 52); the Kafka timestamp is processing-side metadata.

Deserializing the value

The most common payload is JSON. Convert binary → string → struct:

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

payload_schema = StructType([
    StructField("user_id",   StringType(),    False),
    StructField("action",    StringType(),    False),
    StructField("amount",    DoubleType(),    True),
    StructField("event_ts",  TimestampType(), False),
])

decoded = (events
             .select(col("topic"),
                     col("partition"),
                     col("offset"),
                     col("timestamp").alias("kafka_ts"),
                     from_json(col("value").cast("string"), payload_schema).alias("data"))
             .select("topic", "partition", "offset", "kafka_ts", "data.*"))

The two key calls: col("value").cast("string") to go from binary to UTF-8 string, then from_json(..., schema) to parse the JSON into a struct, then data.* to flatten. Forgetting the cast is the single most common Kafka-Spark gotcha. If you from_json(col("value"), schema), Spark will quietly produce all-null structs because it can’t parse binary as JSON. The error is silent; you’ll wonder why your filters match nothing.

For Avro with a Confluent Schema Registry — a very common production setup — the pattern is from_avro (in pyspark.sql.avro.functions) combined with the schema registry client. The Confluent wire format prefixes each message with a magic byte and a 4-byte schema ID, so you typically substring(col("value"), 6, ...) to skip the prefix, then from_avro with the schema fetched from the registry. The abris library wraps this nicely if you don’t want to roll it yourself; the bare from_avro function works if you can hardcode or fetch the schema separately.

For Protobuf, similar story — Spark 4 ships from_protobuf. Same pattern.

For “I just have JSON, life is easy,” from_json is what you reach for. 80% of Kafka-on-Spark pipelines are exactly this.

Offsets and startingOffsets

startingOffsets is the most important option you’ve never thought about. It controls where Spark begins reading from Kafka on the very first run of the query. After that, the checkpoint takes over.

Three forms:

.option("startingOffsets", "latest")    # default — start at end of each partition
.option("startingOffsets", "earliest")  # start at the beginning of retention
.option("startingOffsets", """
    {"user-events": {"0": 1234, "1": 5678, "2": 9012}}
""")  # explicit per-partition offsets

latest (the default) means “skip everything that’s already in Kafka and only process messages that arrive after I start.” Reasonable for real-time-only use cases where historical events don’t matter. Dangerous if you actually wanted history and didn’t realize the default was latest.

earliest means “read everything Kafka still has.” For a topic with seven days of retention and lots of traffic, this is a backfill. Spark will read all of it, in many micro-batches, before catching up to “now.” Use this when you legitimately want to bootstrap from history.

Explicit JSON lets you start from precise per-partition offsets — useful when migrating from another system that left off at known offsets, or when you want to skip a specific bad message.

A symmetric endingOffsets exists but only applies to batch reads (spark.read.format("kafka")), not streaming. Streaming queries are unbounded by definition.

After the first run, the checkpoint owns the offsets. startingOffsets is ignored on every restart that has a valid checkpoint. This is the correct behavior — the whole point of the checkpoint is to remember where you are. But it also means: if you change startingOffsets after deployment, nothing happens. To actually reset, you have to delete or relocate the checkpoint, which then re-applies startingOffsets on the next start.

This is the second most common Kafka-Spark gotcha. Engineer changes latestearliest to backfill, redeploys, sees no change, gets confused. The fix is always: change starting offsets and clear the checkpoint and restart.

At-least-once is built in. Exactly-once needs help.

Spark’s Kafka connector gives you at-least-once semantics out of the box, with a checkpoint:

  1. Spark reads a micro-batch from offsets [a, b) per partition.
  2. Spark processes it and writes to the sink.
  3. Spark commits offsets [a, b) to the checkpoint.

If the job dies between step 2 and step 3, on restart Spark will re-read [a, b) and re-process those messages. The output side may have already received some of them. Result: some rows can appear twice.

For idempotent sinks, this is fine. Examples of idempotent sinks:

  • Delta Lake / Iceberg with MERGE keyed by event ID — re-running is a no-op for already-merged rows.
  • JDBC with an upsert (INSERT ... ON CONFLICT DO UPDATE in Postgres, MERGE in SQL Server) keyed by a stable ID.
  • Kafka with the transactional producer, configured with idempotent writes and an exactly-once sink.

For non-idempotent sinks, at-least-once means duplicates. Options:

  • Make it idempotent at the application layer. Add an event ID to your records (Kafka offset works as one), and the downstream consumer dedupes. Cheap, almost always worth doing.
  • Use foreachBatch with a transactional write. You get the micro-batch DataFrame and you commit offsets and data in the same transaction. If the transaction commits, you’re done; if it fails, neither side advanced.
  • Use Delta or Iceberg as the sink with txnVersion (Delta) — they record per-query commit IDs so retried micro-batches don’t double-write.

There’s no single button labeled “exactly-once” in Structured Streaming. There’s at-least-once + idempotent sink = effectively exactly-once. Plan accordingly.

maxOffsetsPerTrigger: the backfill safety valve

Here’s a scenario that bites people. You set startingOffsets=earliest on a topic with 7 days of retention and 100 GB of data. You start the query. Spark tries to read everything in the first micro-batch, runs out of executor memory, the job dies, restart starts over, dies again, infinite loop.

The fix is maxOffsetsPerTrigger. It caps the number of new offsets read across all partitions per micro-batch:

.option("maxOffsetsPerTrigger", 1_000_000)

With this, the first micro-batch reads up to 1M offsets, processes them, commits, and the next micro-batch reads the next 1M. The backfill takes many micro-batches (proportional to total data / 1M), but each is bounded and OOMs go away. Once the query catches up to “now,” each micro-batch reads only the new arrivals, which is usually well under the cap.

In production I always set this for any topic with non-trivial backlog potential. Default is unlimited, which is a footgun on first deployment.

There’s also minOffsetsPerTrigger (Spark 3.4+), which says “wait until at least N new messages exist before triggering” — useful for low-volume topics where you don’t want to fire a micro-batch for every single message. Pair it with maxTriggerDelay to bound how long Spark will wait.

A complete Kafka-to-Parquet streaming job

Let’s put it all together. Read JSON events from Kafka, parse, filter, write to Parquet, with proper checkpointing and offset bounds:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

spark = (SparkSession.builder
         .appName("KafkaToParquet")
         .config("spark.jars.packages",
                 "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0")
         .getOrCreate())

payload_schema = StructType([
    StructField("user_id",   StringType(),    False),
    StructField("action",    StringType(),    False),
    StructField("amount",    DoubleType(),    True),
    StructField("event_ts",  TimestampType(), False),
])

raw = (spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
         .option("subscribe", "user-events")
         .option("startingOffsets", "earliest")
         .option("maxOffsetsPerTrigger", 500_000)
         .option("failOnDataLoss", "false")
         .load())

decoded = (raw
             .select(col("partition"),
                     col("offset"),
                     col("timestamp").alias("kafka_ts"),
                     from_json(col("value").cast("string"), payload_schema).alias("d"))
             .select("partition", "offset", "kafka_ts", "d.*"))

purchases = decoded.filter((col("action") == "purchase") & (col("amount") > 0))

query = (purchases.writeStream
            .format("parquet")
            .outputMode("append")
            .option("path", "s3://my-bucket/events/purchases/")
            .option("checkpointLocation", "s3://my-bucket/checkpoints/purchases/")
            .trigger(processingTime="60 seconds")
            .start())

query.awaitTermination()

What’s worth pointing out:

  • spark.jars.packages brings in the Kafka connector. Without this you get a ClassNotFoundException on the first read.
  • failOnDataLoss=false tells Spark not to crash if it discovers that the offsets it expected to read have been deleted by Kafka retention. The default is true (crash loudly), which is correct for jobs where missing data is unacceptable. For best-effort backfill jobs, false lets Spark skip ahead with a warning rather than dying.
  • The checkpoint is on S3. That’s fine for production deployments because S3 is durable. Don’t put it on local disk in a clustered job; node failures will lose it.
  • processingTime("60 seconds") matches a 1-minute landing cadence for downstream consumers. Tune to taste.
  • The output is Parquet partitioned by nothing right now — for a real production job, add .partitionBy("event_date") or similar to keep Parquet directories navigable. Pulled out for clarity.

Run this. Drop messages onto user-events. Watch Parquet files appear in s3://my-bucket/events/purchases/. Kill the job. Drop more messages. Restart. Spark resumes from the checkpoint, reads only the new messages (because old offsets are committed), and continues.

A few last things I’d tell a colleague

  • group.id is set by Spark, not you. If you set kafka.group.id manually, Spark will warn or refuse. Spark manages offsets through its checkpoint, not through Kafka’s consumer group offsets. This catches people who came from Kafka Streams or kafka-python; the mental model is different.
  • One Spark partition per Kafka partition. If your topic has 12 partitions, your micro-batch has 12 input tasks. Want more parallelism downstream? repartition after the read. Want fewer? Combine with coalesce or stick with the default. The Kafka partition count caps your read parallelism; talk to your platform team about partition count if it’s too low.
  • Headers are surfaced since Spark 3.0, and you can also write them on the Kafka sink. Good for tracing IDs, schema versions, tenant tags.
  • Watch out for time skew between brokers and processors. Kafka’s timestamp field is set by whoever — broker, producer — depending on configuration. If you use it for windowing, double-check you’re using the time you mean. Better: use a field in the message body that you control.

That covers the source. Lesson 52 is event-time and watermarks — the piece that lets you write groupBy(window(...)) on a Kafka stream and get correct results even when events arrive out of order. After that, output modes (53), stateful operations (54), stream-stream joins (55), and we’ll have a complete streaming toolkit.


References: Apache Spark Structured Streaming + Kafka Integration Guide (https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html) and the Kafka client documentation (https://kafka.apache.org/documentation/). Retrieved 2026-05-01.

Search