PySpark, from the ground up Lesson 47 / 60

Cloud storage: S3, GCS, Azure Blob — what changes

The consistency caveats, the rename problem, and why direct-write committers exist.

If you’ve been using Spark in the last few years, the odds that your data lives on actual HDFS are very low. Most of us read and write s3://..., gs://..., or abfs://... paths. The cluster comes up, the cluster goes away, the data persists in object storage. That’s the model.

It’s a great model. It’s also a model where Spark — originally designed against HDFS — leaks abstractions in interesting ways. Object stores aren’t filesystems, even when the URLs make them look like ones. This lesson is about what changes when your “filesystem” is really a key-value store, and what to configure so Spark behaves itself.

The conceptual difference

A filesystem like HDFS has directories. A directory is a real entity with metadata, atomic operations, and a parent-child tree. mv /old/path /new/path is a metadata pointer change — instant, atomic.

S3 has none of that. S3 is a key-value store. Keys are flat strings. There’s no /year=2024/month=03/ directory; there are just objects whose keys happen to start with year=2024/month=03/. The “directory” you see in the AWS console is a UI affectation built by listing keys with a common prefix.

The consequences:

  • No atomic rename. aws s3 mv is copy plus delete. For a single small object that’s fast. For a 50 GB object it’s slow. For a “directory” of 10000 objects it’s 10000 separate copy-delete pairs.
  • No empty directories. A “directory” exists if and only if at least one key has it as a prefix. Spark sometimes works around this by writing zero-byte placeholder objects.
  • List operations are paged and eventually-consistent in some operations. Listing a prefix that just had 10000 objects added returns the new objects on S3 today (since 2020), but the operation is slow and rate-limited.

GCS and Azure Blob have similar models. Different APIs, similar tradeoffs.

The rename problem

Here’s why this matters for Spark.

When Spark writes a DataFrame to a partition path, here’s what the standard FileOutputCommitter (algorithm v1) does:

  1. Each task writes its output to a temporary path: s3://bucket/output/_temporary/0/_temporary/attempt_xxx/part-00000-...parquet.
  2. When the task completes, the driver renames the task-temp path to the job-temp path.
  3. When the entire job completes, the driver renames everything from the job-temp path to the final output path.

On HDFS, those renames are pointer flips. Microseconds. The whole commit step is functionally free.

On S3, every “rename” is a recursive copy-then-delete of every part file. For a job that produces 1000 part files of 100 MB each, the commit step alone copies 100 GB across S3, then deletes the originals. I’ve seen production jobs where the actual computation took 10 minutes and the commit took 40. The job appears stuck at “99% complete” while the driver crawls through copy operations.

It also fails awkwardly. If the driver dies during the rename, you have half the files in their final location and half in the temporary path. There’s no atomic rollback because there’s no atomic rename.

This is the rename problem.

Direct-write committers

The fix is committers that don’t rename. They write directly to the final location and use S3’s multipart-upload protocol to defer the “make this object visible” step until commit time. Multipart upload is naturally two-phase: you upload parts, then you call CompleteMultipartUpload to make the assembled object appear. If you never call complete, nothing appears.

Several implementations exist:

  • S3A magic committer — the open-source one, ships with Hadoop 3.x. Tasks upload parts to the final S3 keys but defer the multipart-upload completion. The driver completes them all at job commit. Very fast, no renames.
  • EMRFS S3-optimized committer — AWS EMR’s version, broadly equivalent.
  • Databricks DBIO committers — Databricks’ proprietary version. Same idea.
  • Hadoop FileOutputCommitter algorithm v2 — not S3-aware, but a partial improvement: it skips the second-stage rename by promoting task output to final location at task-commit time. Still does one round of renames, still slower than the magic committer, but better than v1.

If you’re on EMR, Databricks, or Dataproc, the platform usually wires this up for you. If you’re running Spark on plain Kubernetes or EC2 against S3, you have to configure it.

Configuring the S3A magic committer

Open-source Hadoop 3 with the magic committer:

spark = (SparkSession.builder
    .appName("S3App")
    # Use the S3A connector
    .config("spark.hadoop.fs.s3a.impl",
            "org.apache.hadoop.fs.s3a.S3AFileSystem")
    # Tell Spark to use the path-output-committer protocol
    .config("spark.sql.sources.commitProtocolClass",
            "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
    # And use the magic committer for s3a paths
    .config("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a",
            "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
    .config("spark.hadoop.fs.s3a.committer.name", "magic")
    # Required: Parquet's normal committer doesn't compose with magic
    .config("spark.sql.parquet.output.committer.class",
            "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
    # Recommended: enable conflict resolution mode
    .config("spark.hadoop.fs.s3a.committer.staging.conflict-mode", "append")
    .getOrCreate())

Yes, that’s a lot of options. They all do something. The pattern repeats: tell Spark which commit protocol to use, tell Hadoop which committer factory to use for the s3a scheme, and patch Parquet so its custom committer doesn’t override the magic one.

Once configured, your writes look like normal Spark writes:

(df.write
   .mode("overwrite")
   .partitionBy("country")
   .parquet("s3a://my-bucket/datasets/orders/"))

But the commit phase is now seconds instead of minutes. Writes are also atomic at the object level — you don’t see half-written files in the bucket.

You also need the right JARs on the classpath: hadoop-aws, hadoop-cloud, and the matching aws-java-sdk-bundle. Match the Hadoop major version of your Spark distribution; mismatches give you bewildering NoSuchMethodError exceptions at write time.

Consistency: the old story is dead

If you read S3 documentation from before December 2020, you’ll see warnings about eventual consistency: “after writing an object, a subsequent list might not see it; a read-after-overwrite might return the old version.” Spark had to work around this with tools like S3Guard (a DynamoDB-backed consistency cache) and various retry layers.

That’s all gone. AWS announced in December 2020 that S3 is strongly read-after-write consistent for all operations: PUT, GET, LIST, DELETE, all of it. If your write succeeds, every subsequent read across all clients sees the new version. S3Guard was deprecated in Hadoop 3.3 and removed in 3.4.

In 2026, you can safely ignore consistency caveats in old blog posts and Stack Overflow answers. If you’re still configuring S3Guard, stop. You’re paying for a DynamoDB table you don’t need.

GCS and Azure Blob have offered strong consistency for longer. The same simplification applies.

The s3 / s3a / s3n naming history

You’ll see three URL schemes in old code, and only one is current:

  • s3:// — original Hadoop scheme, backed by a block-based filesystem stored on S3. Long deprecated. Modern AWS uses this URL form in CLI/SDKs to mean “the standard S3 object protocol”, but in Hadoop/Spark configurations it historically meant something different. Don’t use s3:// in Spark URIs against open-source Hadoop.
  • s3n:// — second-generation, native S3 backend. Removed in Hadoop 3.
  • s3a:// — current generation, ships with Hadoop 2.7+. Performance-tuned, supports the magic committer, supports IAM roles, supports server-side encryption. This is the one to use.

The exception: AWS EMR uses s3:// internally for EMRFS, AWS’s own S3 connector. So on EMR specifically, s3:// works and is the default. On Databricks, s3a:// and a Databricks-specific dbfs:/ for mounted paths. On plain Kubernetes-on-EC2, always s3a://.

If you write portable code, parameterize the scheme:

input_path = os.environ.get("INPUT_PATH", "s3a://my-bucket/raw/")

Authentication: the IAM role best practice

Three ways to authenticate. From worst to best:

Hardcoded keys in the SparkSession config. Don’t.

# DO NOT DO THIS
.config("spark.hadoop.fs.s3a.access.key", "AKIA...")
.config("spark.hadoop.fs.s3a.secret.key", "...")

Keys end up in logs, in process listings, in screenshots. Even if you “rotate them later,” you’ve leaked them.

Environment variables. Better. The S3A connector reads AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY automatically via the default credential provider chain. This works for local development against AWS sandbox accounts. Still don’t do it in production — env vars on shared compute are visible to anyone who can introspect the process.

Instance profiles / IAM roles for service accounts. The right answer.

  • On EC2: attach an IAM instance profile to the executor nodes. The S3A connector calls the EC2 instance metadata service to get short-lived credentials. No keys anywhere.
  • On EKS / Kubernetes: use IRSA (IAM Roles for Service Accounts). The pod gets a projected token; the S3A connector exchanges it for STS credentials. No keys.
  • On EMR: the cluster role is automatic.

For S3A specifically, the credential provider chain is:

.config("spark.hadoop.fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

That chain tries env vars, system properties, the AWS profile file, and finally the EC2 instance metadata service, in order. Set up an IAM role and the chain finds it without any explicit key config.

GCS uses workload identity on GKE or service account JSON keys (which have the same “don’t leak them” caveats). Azure Blob uses managed identities or SAS tokens. Same principle: prefer the platform’s identity mechanism over key material in code.

GCS and Azure: same story, different connector

The conceptual landscape is identical. Different connector classes, different config prefixes.

GCS:

.config("spark.hadoop.fs.gs.impl",
        "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
.config("spark.hadoop.fs.AbstractFileSystem.gs.impl",
        "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

URIs are gs://bucket/path/. GCS supports a similar direct-write committer (built into the GCS connector since 2.x). Strong consistency since launch.

Azure Data Lake Storage Gen2 uses the ABFS driver:

.config("spark.hadoop.fs.abfs.impl",
        "org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem")

URIs are abfs://container@account.dfs.core.windows.net/path/. ADLS Gen2 with hierarchical namespace enabled is closer to a real filesystem than S3 — directories are first-class and rename is atomic. You can use the standard FileOutputCommitter without paying the rename tax.

A working S3 read-and-write example

from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .appName("S3Demo")
    .config("spark.jars.packages",
            "org.apache.hadoop:hadoop-aws:3.3.6,"
            "org.apache.hadoop:hadoop-cloud:3.3.6")
    # Filesystem
    .config("spark.hadoop.fs.s3a.impl",
            "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider",
            "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
    # Magic committer
    .config("spark.sql.sources.commitProtocolClass",
            "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
    .config("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a",
            "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
    .config("spark.hadoop.fs.s3a.committer.name", "magic")
    .config("spark.sql.parquet.output.committer.class",
            "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
    # Modest connection pool, retries
    .config("spark.hadoop.fs.s3a.connection.maximum", "100")
    .config("spark.hadoop.fs.s3a.attempts.maximum", "10")
    .getOrCreate())

# Read
events = spark.read.parquet("s3a://my-bucket/raw/events/year=2026/")

# Aggregate
daily = (events
         .groupBy("country", "event_date")
         .count())

# Write — partitioned, magic-committer-backed
(daily.write
      .mode("overwrite")
      .partitionBy("event_date")
      .parquet("s3a://my-bucket/curated/daily_events/"))

If you want to verify the magic committer is active, look at the driver log on write. You should see lines like Using committer: MagicS3GuardCommitter. If you see FileOutputCommitter algorithm v1, you’re back on the slow path; check the four-config block above for typos.

A short DON’T list

Don’t use s3:// URIs against open-source Spark/Hadoop. Use s3a://. The scheme matters; some bucket-listing operations will silently fall back to inefficient paths.

Don’t write a partitioned table to S3 with a thousand small files per partition. Each file is a separate object, each object is a separate PUT, and S3 rate-limits PUTs at around 3500/second per prefix. Coalesce or repartition first; aim for files of 128MB-1GB.

Don’t disable strong consistency expectations. They work. Stop putting in defensive Thread.sleep calls “in case S3 hasn’t caught up.”

Don’t paste AWS keys into config. Use IAM roles. If you absolutely must use keys for a local-dev script, use the AWS profile file, never hardcoded strings.

Don’t ignore spark.hadoop.fs.s3a.connection.maximum. The default pool size (around 15) is fine for small jobs. Real workloads with hundreds of executor cores need 100+. If you see “Timeout waiting for connection from pool” in your logs, this is the knob.

Next lesson, schema evolution: what happens when the upstream team adds a column, renames one, or changes a type, and how the different file formats handle it. Parquet’s mergeSchema, Avro’s writer-reader resolution, and the Delta/Iceberg way of versioning schema changes in the table log.


References: Apache Spark documentation, Hadoop S3A documentation (https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html), AWS S3 strong consistency announcement (December 2020). Retrieved 2026-05-01.

Search