Table of Contents

A Comprehensive Guide to Structured Streaming in Databricks

Structured Streaming is Apache Spark’s high-level API for processing real-time data streams. Its core innovation is treating a live data stream as a continuously growing, or unbounded, table. This allows developers to use the same familiar DataFrame API for both batch and streaming data processing, significantly simplifying the development of complex streaming applications.

The Core Concepts: Unbounded Tables and Streaming DataFrames

The Unbounded Table Abstraction

The main abstraction in Structured Streaming is the unbounded table. As new data arrives in a data stream (e.g., new messages in a Kafka topic), it is treated as new rows being appended to this conceptual table. You can then define a query on this input table, just as you would with a static one.

Code snippet

+----------+    +----------+    +----------+
| Record 3 | -> | Record 2 | -> | Record 1 | -> [Input Data Stream]
+----------+    +----------+    +----------+
      |
      V
+----------------------+
|                      |
|  Unbounded Table     |
| (in Spark's memory)  |
|  +----------------+  |
|  | Record 1       |  |
|  | Record 2       |  |
|  | Record 3       |  |
|  | ...            |  |
|  +----------------+  |
|                      |
+----------------------+
Python

The Streaming DataFrame

A streaming DataFrame is the key data structure, created using the DataStreamReader (spark.readStream), that represents this unbounded table. You apply transformations to it just like a static DataFrame.

Micro-Batch Processing: Near Real-Time or Batches?

By default, Structured Streaming operates as a micro-batch processing engine. It checks for new data from the source and processes it in small, discrete batches, giving the appearance of near real-time processing.

The trigger() Method: Controlling Batch Cadence

The trigger() method, part of the DataStreamWriter (df.writeStream), controls when these micro-batches are executed.

  • Default Trigger: Processes the next batch as soon as the previous one finishes.
  • ProcessingTime Trigger: trigger(processingTime='1 minute') executes a batch at a fixed time interval.

This is different from Kafka’s auto.offset.reset property. Spark’s trigger() controls the processing frequency, while auto.offset.reset in ksqlDB/Kafka controls the starting position for a new consumer group that has no saved progress.

The outputMode() Method: Defining the Output

This method defines what data is written to the sink each time the stream is updated.

  • append: (Default) Only new rows added to the result table since the last trigger are written.
  • complete: The entire updated result table is written out. This is required for aggregations that don’t use watermarking.
  • update: Only the rows in the result table that were updated since the last trigger are written.

How update Output Mode Determines Results

The update output mode is specifically designed for stateful streaming queries, most commonly those involving aggregations (GROUP BY). It works by intelligently tracking changes to the aggregated result table between micro-batches.

Here is the step-by-step process:

  1. Maintain State: Spark maintains the full, intermediate result table of the aggregation in memory, backed by the checkpoint location. This is known as the state store.
  2. Process New Data: When a new micro-batch of data arrives, Spark updates the values in its internal state store. For example, if a count for a specific key was 5 and a new record for that key arrives, the state is updated to 6.
  3. Compare and Emit: Before writing to the sink, Spark compares the newly updated state with the state from before the batch started. It identifies only the rows whose aggregation values have changed. Only these updated rows are emitted as the output for that micro-batch. Rows whose aggregation values did not change are not part of the output.

This process is what allows sinks like databases or key-value stores to be updated with just the latest changes, rather than having to process the entire result set each time.

This flow can be visualized as follows:

State at Time T0        New Data Arrives      State at Time T1          Output of `update` Mode
+----------------+        +--------+        +----------------+        +--------------------------+
| device | count |        | device |        | device | count |        | The row for 'B' changed, |
|--------|-------|        |--------|        |--------|-------|        | so only it is emitted.   |
|   A    |   5   |        |   B    |        |   A    |   5   |        |                          |
|   B    |   3   |------> |   A    |------> |   B    |   4   |------->|   B    |    4            |
+----------------+        |   B    |        +----------------+        +--------------------------+
                          +--------+
Python

Fault Tolerance and Processing Guarantees

The checkpointLocation

The checkpoint location is the most critical component for making a stream reliable. It is a path to a directory in cloud storage where Spark persists the state of the stream, including:

  • Offsets: The exact progress in the source stream (which files have been read, which Kafka offset is next).
  • Write-Ahead Logs (WAL): A log of the transactions for each batch, used to ensure operations are atomic.
  • State: The intermediate data for stateful operations like streaming aggregations.

If a streaming job fails, it can be restarted from its checkpoint, and it will resume exactly where it left off. This prevents both data loss and duplicate processing.

The Role of the Write-Ahead Log (WAL)

The Write-Ahead Log (WAL) is a fundamental component of Structured Streaming’s fault-tolerance mechanism, providing the foundation for its exactly-once processing guarantees. It is a detailed journal stored within the checkpoint directory that reliably tracks the status of each micro-batch.

The process follows a two-phase commit protocol:

  1. Log Intent to Process: When Spark is ready to process a new micro-batch (e.g., batch #101), the very first thing it does is write an entry to a log file in the checkpoint directory. This entry effectively says, “I am now starting to process batch #101, which corresponds to Kafka offsets 500-600.” This log is durably stored in cloud storage.
  2. Process Data and Write to Sink: Spark then executes the data processing logic for batch #101 and writes the results to the destination sink (e.g., a Delta table).
  3. Log Completion (Commit): Only after the data has been successfully written to the sink, Spark writes a final “commit” entry to the WAL for batch #101, marking it as complete.

This process ensures reliability during failures. If the job or cluster crashes after Step 2 but before Step 3, the data has been written to the sink, but the WAL does not have the final “commit” entry. Upon restart, Spark will read the WAL, see that batch #101 was started but never committed, and safely re-process the exact same data from the source (Kafka offsets 500-600). Because the sink (like a Delta table) is idempotent, writing the same data again does not create duplicates. This guarantees that no data is lost and each record is processed exactly once.

Checkpoints vs. Kafka Consumer Groups

Spark’s checkpointing mechanism is analogous to a Kafka consumer group’s committed offsets, as both track progress in a topic. However, Spark’s checkpoint is more comprehensive; in addition to the source offsets, it also stores the state of the computation itself (like running counts) and the write-ahead log, which are essential for its processing guarantees.

Exactly-Once Semantics

Structured Streaming can provide exactly-once processing guarantees when used with replayable data sources (like Kafka) and idempotent sinks (like Delta Lake). This is achieved through the checkpoint and write-ahead logs. Idempotency ensures that if Spark re-processes a batch after a failure, the duplicate data is not written to the target. This is handled at the batch/transaction level by Delta Lake, not by inspecting individual record content.

Stateful Streaming and Operations

Unsupported Operations

Operations that require a full, static view of the entire dataset, like sorting an entire stream or exact, unbounded deduplication, are generally not supported. This is a logical constraint similar to that faced by other engines like ksqlDB/Kafka Streams when performing stateful operations.

Windowing and Watermarking

To perform aggregations on a stream, you group data into finite windows.

  • Windowing: Allows for aggregations over fixed time intervals.
  • Watermarking: Defines a threshold for how late data is expected to arrive. This allows the streaming engine to know when it can safely drop old state for aggregations, preventing state from growing indefinitely.

Example of Windowing and Watermarking:

from pyspark.sql.functions import window

# Count events in 10-minute windows, allowing for 20 minutes of late data
event_counts = streaming_df \
    .withWatermark("event_timestamp", "20 minutes") \
    .groupBy(window("event_timestamp", "10 minutes", "5 minutes")) \
    .count()
Python

Working with Streaming Views and Results

Temporary Views vs. Streaming Temporary Views

FeatureRegular Temporary ViewStreaming Temporary View
Source DataStatic DataFrame/TableStreaming DataFrame
ResultA fixed, static resultA dynamic result, updated each micro-batch
Use CaseNaming intermediate batch resultsInteractively querying a running stream’s state

A query on a streaming temporary view produces another streaming DataFrame, not a static one. This is conceptually very similar to a ksqlDB Table, which also represents the current, materialized state of an aggregated stream.

Persisting Incremental Results

The only way to persist the output of a stream is to write it to a sink, such as a Delta table.

Streaming vs. Static DataFrames

A streaming DataFrame represents a continuous flow, while a static DataFrame is a finite dataset. To get a static representation of a stream’s output, you must read from the sink table where the stream is writing its results.

Streaming Queries

A streaming query is started with .start() and runs in the background as a long-running, persistent query, much like a persistent query in ksqlDB. It runs until it is explicitly stopped with .stop(). The .awaitTermination() method can be used to block a script’s main thread until the stream is stopped.

Terminating a Streaming Query

A Structured Streaming query can be terminated in two main ways: explicitly by calling the .stop() method on the query object, or implicitly when the associated Spark application shuts down.

Explicit Termination: Using .stop()

The most direct way to stop a running stream is by calling the .stop() method on the StreamingQuery object. When you start a stream using .start() or .toTable(), it returns this object, which acts as a handle to the running query. It’s a best practice to assign this object to a variable so you can manage the stream’s lifecycle.

Example:

# Start the stream and assign the query object to a variable
streaming_query = (
  spark.readStream
    .format("rate")
    .load()
    .writeStream
    .format("memory")
    .queryName("my_rate_stream")
    .start()
)

# You can check if the stream is active
# print(f"Is stream active? {streaming_query.isActive}")

# ... later, in another cell or part of the code, you can stop it
streaming_query.stop()
Python

Implicit Termination

A streaming query will also terminate automatically if the underlying Spark application stops. This can happen in several scenarios:

  • Cluster Shutdown: If the cluster the stream is running on is manually terminated or has an auto-termination policy, the Spark application ends, and all associated streams are stopped.
  • Job Completion: If your streaming code is part of a Databricks Job, the stream will terminate when the job’s main task finishes. This is where .awaitTermination() becomes important.
  • Interactive Notebooks: When you detach a notebook from a cluster, the SparkSession for that notebook is eventually terminated, which stops its streams.

The Role of .awaitTermination()

The .awaitTermination() method does not stop the stream. Its purpose is to block the execution of your script’s main thread, effectively waiting until the stream is terminated by some other means (either by .stop() being called from another thread or by an external failure).

This is crucial for applications submitted as jobs. Without it, the main thread of your script would exit immediately after starting the stream, causing the Spark application to shut down and terminate your stream prematurely.

Example (for a script submitted as a job):

# Start a stream in the background
query = (
  spark.readStream
    .format("kafka")
    .option("subscribe", "my_topic")
    .load()
    .writeStream
    .format("delta")
    .option("checkpointLocation", "/path/to/checkpoint")
    .toTable("my_sink_table")
)

# The main thread will now wait here indefinitely until the query is stopped or fails.
query.awaitTermination()
Python

Controlling Stream Execution: awaitTermination() vs. trigger(availableNow=True)

These two options control the execution lifecycle of a streaming query, but they are used for fundamentally different purposes.

Understanding trigger(availableNow=True)

This trigger modifies a streaming query to run like a batch job. When started, the query will process all available, unprocessed data in the source in one or more micro-batches, and then the stream automatically stops itself. This effectively transforms a stream into an incremental batch process.

Comparing the Concepts
Concepttrigger(availableNow=True).awaitTermination()
PurposeTo run a streaming query as a single, finite batch job.To keep a long-running, continuous streaming job alive.
Stream Stops?✅ Yes, automatically after processing available data.❌ No, it waits for an external stop (e.g., job cancellation).
Use CaseScheduled, incremental batch jobs (e.g., run every hour).Continuous, always-on streaming applications.

Comparison: Spark Structured Streaming vs. ksqlDB

FeatureSpark Structured StreamingksqlDB / Kafka Streams
Core EngineApache Spark (general-purpose)Kafka Streams (Kafka-native)
LanguageSQL, Python, Scala, RSQL-like (ksqlDB), Java/Scala
Data SourcesMany (Kafka, files, JDBC, etc.)Kafka-centric
Processing ModelMicro-batch (primarily)Record-by-record stream processing
EcosystemPart of broader Databricks ecosystemTightly integrated with Kafka ecosystem

Incremental Data Ingestion from Files

A core task in data engineering is incrementally ingesting new data files from cloud storage into reliable Delta tables. This process must be efficient, scalable, and fault-tolerant to avoid data loss or duplication. Databricks provides two primary features for this purpose: the COPY INTO SQL command for simple batch ingestion, and Auto Loader for robust, scalable streaming ingestion.

COPY INTO: A Simple and Idempotent Batch Ingestion

The COPY INTO command is a SQL-native, idempotent way to load data from a file location into a Delta table. It is designed for simple, repeatable batch loading jobs.

How COPY INTO Works Idempotently

COPY INTO is idempotent, meaning that if you run the same command multiple times, it will not load the same files more than once. It achieves this by tracking which files have been successfully processed. This state is stored with the Delta table itself. When the command is run, it discovers all files in the source path and skips any file names it has already seen and recorded as ingested.

SQL Example:

COPY INTO bronze_events
FROM '/mnt/landing/raw_events/'
FILEFORMAT = JSON
COPY_OPTIONS ('mergeSchema' = 'true');
SQL

Handling Updated Files

By default, COPY INTO is designed to process new files that appear in a source directory. It does not reprocess files that have been updated in place. It identifies files to process based on their name and, if available, their modification time, but its state management is geared toward preventing duplicate ingestion of the same file, not detecting changes within a file.

COPY INTO vs. Other Patterns

  • COPY INTO vs. Direct SELECT: A direct query like SELECT * FROM json.\/path/`is a stateless, ad-hoc operation that reads all files in the path every time.COPY INTO` is a stateful command designed for loading that remembers what it has processed.
  • COPY INTO vs. View -> CTAS Pattern: COPY INTO is a great alternative to the View -> CTAS pattern for simpler ingestion scenarios. It combines file parsing and loading into a single, easy-to-use command. The View -> CTAS pattern remains more powerful when you need complex transformations, joins with other tables, or advanced cleaning logic before loading the data.

Auto Loader: The Robust and Scalable Streaming Ingestion

Auto Loader is the recommended, most robust, and scalable Databricks feature for incremental data ingestion from cloud storage. While it uses the Structured Streaming API, it can be configured to run in discrete batches or continuously.

How Auto Loader Works

Auto Loader is used via the cloudFiles format in a streaming read. This format wraps the actual source file format (like JSON, CSV, or Parquet) and adds powerful file discovery capabilities. It can discover new files using either a simple directory listing or by subscribing to cloud provider notification services for large numbers of files.

Checkpointing, Fault Tolerance, and Non-Redundant Processing

Like all Structured Streaming queries, Auto Loader relies on a checkpoint location. This checkpoint stores the state of which files have been discovered and processed. This is the key to both fault tolerance and non-redundant processing. If a job fails, it can restart from the checkpoint and know exactly which files to process next, guaranteeing that each file is processed exactly once.

Schema Inference and Evolution

A key advantage of Auto Loader is its robust support for schema management.

  • Schema Inference: It can automatically infer the schema of the source data by sampling the files.
  • Schema Storage and Evolution: It stores the inferred schema in a dedicated location (_schemas within the checkpoint directory) to ensure consistency across runs. It can also be configured to handle schema evolution. If new columns appear in the source files, Auto Loader can detect this and gracefully add them to the target Delta table without failing the pipeline.

Python Example

This example sets up an Auto Loader stream that runs as a triggered batch job, processing all new files that have arrived since the last run.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "/path/to/schema_location")
  .load("/path/to/source_data")
  .writeStream
  .option("checkpointLocation", "/path/to/checkpoint")
  .option("mergeSchema", "true")  # Allows for schema evolution
  .trigger(availableNow=True)    # Runs one batch and stops
  .table("target_delta_table")   # Writes to a Delta table
)
Python

Comparing COPY INTO and Auto Loader

FeatureCOPY INTOAuto Loader
Execution ModelSQL Command (Batch)Streaming (Micro-batch or Continuous)
Simplicity✅ Very SimpleMore configuration required
ScalabilityGood for thousands of files✅ Excellent for millions of files
Schema EvolutionLimited (mergeSchema option)✅ Robust, automated handling
File DiscoveryDirectory listingDirectory listing or cloud notifications
Use CaseSimple, scheduled batch loadsRobust, scalable, and automated ingestion

The Medallion Architecture: Building Incremental Pipelines

The Medallion Architecture, also known as a multi-hop pipeline, is a data design pattern used to logically organize data in a lakehouse. It ensures data quality and reliability by progressively transforming raw data through a series of tables, moving from a raw state to a validated state, and finally to a highly refined state for analytics.

What is the Multi-Hop Pipeline?

The core idea is to move data through three distinct layers, or tables: Bronze, Silver, and Gold. Data is ingested into the Bronze layer and then incrementally processed into the Silver and Gold layers.

This architecture can be visualized as follows:

Code snippet

+-------------+      +----------------+      +--------------------+      +-----------------+      +----------+
|             |      |                |      |                    |      |                 |      |          |
| Raw Sources |----->|  Bronze Table  |----->|  Silver Table      |----->|   Gold Table    |----->| BI / ML  |
| (JSON, CSV) |      | (Raw, Append)  |      | (Cleaned, Enriched)|      |  (Aggregated)   |      |          |
|             |      |                |      |                    |      |                 |      |          |
+-------------+      +----------------+      +--------------------+      +-----------------+      +----------+
Python

Benefits of the Medallion Architecture

  • Incremental ETL: Pipelines only need to process new or changed data as it arrives at each stage, which is far more efficient than reprocessing the entire dataset each time.
  • Combining Streaming and Batch: The architecture seamlessly supports mixed workloads. For example, a Bronze table can be populated in near real-time by a streaming job, while a downstream Gold table can be created by a daily batch job that reads from the Silver table.
  • Re-creation of Tables: The Bronze layer serves as an immutable historical archive. If a bug is found in the business logic of the Silver or Gold tables, they can be easily dropped and completely recreated by reprocessing the raw data from the Bronze layer.

The Three Layers Explained

Bronze Tables (Raw Data)

The Bronze layer contains raw, unfiltered data ingested from source systems. The goal is to capture the source data in its original state. The schema often matches the source, with additional metadata columns added during ingestion, such as the load timestamp or source filename.

Silver Tables (Cleaned & Conformed Data)

The Silver layer provides a validated, enriched, and more structured view of the data. Data from the Bronze layer is filtered for quality, joined with other data sources to add context, and organized into well-defined tables with clean data types. This layer is often the source for data science and ad-hoc analytics.

Gold Tables (Aggregated Business-Level Data)

The Gold layer contains highly refined and aggregated data created for specific business use cases. These tables are often organized as data marts, with data optimized for analytics and reporting by BI tools. They are typically aggregated by business dimensions, such as by customer, product, or region.

Building Streaming Pipelines with SQL

Databricks SQL provides powerful, simplified syntax for creating streaming data pipelines.

Creating Streaming Tables

A streaming table is a Delta table that is continuously populated by a streaming query. You can define it with a single CREATE STREAMING TABLE command. Databricks automatically manages the underlying checkpointing and micro-batching. The read from the source stream and the write to the target streaming table happen continuously.
The CREATE STREAMING TABLE command only executes on Unity-Catalog-enabled clusters; on legacy (non-UC) clusters it merely parses without running.

Auto Loader with SQL

Auto Loader is used in Databricks SQL via the cloud_files() table-valued function. This is the standard and correct way to incrementally ingest files from cloud storage in a streaming SQL pipeline, as it is designed specifically for this purpose and accepts all the cloudFiles.* options for schema inference and evolution.

Example:

This SQL statement creates a streaming table named bronze_raw_logs that incrementally ingests new JSON files from a source directory using the cloud_files function for Auto Loader.

CREATE STREAMING TABLE bronze_raw_logs
COMMENT 'Raw JSON logs ingested incrementally from cloud storage.'
AS SELECT
  *,
  _metadata.file_path AS source_file
FROM STREAM cloud_files(
  "/mnt/landing/logs/", "json",
  map("cloudFiles.schemaLocation", "/path/to/schema_storage")
);
SQL

Creating Streaming Views

A streaming view can also be created to provide a view over a streaming source that always returns fresh results when queried, without materializing data to a table.

LEAVE A REPLY

Please enter your comment!
Please enter your name here

This site uses Akismet to reduce spam. Learn how your comment data is processed.