Databricks Lakeflow: The Unified Ingestion Engine

Databricks Lakeflow represents the next evolution of data engineering on the Databricks platform. It unifies the concepts of batch processing, streaming ingestion, and workflow orchestration into a single, cohesive experience. At its core, Lakeflow aims to simplify the most critical part of any data platform: reliably and efficiently getting data from various sources into the Lakehouse. This is achieved through a combination of enhanced connectors, intelligent ingestion services, and a declarative framework that builds upon the foundations of Delta Live Tables (DLT) and Databricks Jobs.

Lakeflow Connect: Bridging Data Sources to the Lakehouse

Getting data into the platform is the first hurdle. Lakeflow Connect provides a robust and simplified framework for connecting to a vast ecosystem of data sources, moving beyond traditional, manual configurations.

The Foundation of Ingestion

Connectors are the drivers that allow Databricks to read from and write to external systems. Lakeflow categorizes these into two main types: standard, self-managed connectors and a new tier of fully managed connectors.

Standard Connectors

For years, Apache Spark has shipped with a wide array of built-in connectors for file systems (like S3, ADLS, GCS), databases (via JDBC), and message queues (like Kafka). These are highly configurable and powerful, but often require developers to manage credentials, networking, and dependencies manually. They remain a core part of the platform for custom or unsupported sources.

Managed Connectors

Managed connectors are a game-changer for common sources like PostgreSQL, MySQL, and SQL Server. They provide a simplified, UI-driven setup that abstracts away the complexity of data movement. This model is conceptually similar to Kafka Connect, where the user declaratively defines the source and lets the platform handle the execution, fault tolerance, and state management.

Under the hood, a managed connector for a database source uses a multi-stage, secure architecture:

  1. Ingestion Gateway: A secure, Databricks-hosted service that establishes a connection to the source database.
  2. Staging & State Management: The gateway pulls data and lands it in an encrypted staging area within a Unity Catalog Volume. This Volume is also used to manage state, such as watermarks for change data capture (CDC).
  3. Managed Ingestion: A serverless Delta Live Tables (DLT) job, managed entirely by Databricks, picks up the data from the staging Volume, processes it, and loads it into a target Delta table.

This flow is designed for both initial snapshots and ongoing CDC replication, with credentials securely managed by Unity Catalog.

+--------------+      +-------------------+      +-----------------+      +-----------------+      +-----------------+
|  Source DB   |----->| Ingestion Gateway |----->|  UC Volume      |----->|  Managed        |----->|  Bronze Table   |
|              |      |(Databricks-hosted)|      |  (Staging Area) |      |  Ingestion (DLT)|      |  (Delta Lake)   |
+--------------+      +-------------------+      +-----------------+      +-----------------+      +-----------------+
Python

Partner Connect

Partner Connect complements this ecosystem by providing a streamlined marketplace for integrating third-party data tools, such as Fivetran or dbt. While not a direct part of Lakeflow, it simplifies the initial setup for these partners, which in turn act as data sources for Lakeflow pipelines.

Core Ingestion Patterns

Lakeflow supports a spectrum of ingestion patterns, from simple batch loads to low-latency real-time streams, all targeting Delta Lake tables.

Batch Ingestion

Batch is ideal for one-time data loads or for sources that are only updated periodically.

Simple and Powerful: CTAS

The CREATE TABLE AS SELECT (CTAS) statement is the most straightforward way to perform a batch ingestion. Using the read_files function, you can ingest data directly from cloud storage.

CREATE OR REPLACE TABLE raw_logs
AS SELECT * FROM read_files(
  'abfss://[email protected]/raw/logs/',
  format => 'json'
);
SQL

This declarative SQL statement is equivalent to the more verbose programmatic Spark API:

# Equivalent PySpark Code
spark.read.format("json") \
  .load("abfss://[email protected]/raw/logs/") \
  .write.mode("overwrite") \
  .saveAsTable("raw_logs")
Python

Incremental and Streaming Ingestion

For sources with new data arriving continuously, an incremental or streaming approach is necessary to avoid reprocessing the entire dataset.

COPY INTO: The Legacy Incremental Tool

COPY INTO is a simple, idempotent command for incrementally loading new files from a location into a Delta table. It’s a two-step process: first, create the target table, then use COPY INTO to load data.

-- Step 1: Create the table (if it doesn't exist)
CREATE TABLE IF NOT EXISTS landing_orders (
  orderId INT,
  orderDate TIMESTAMP,
  amount DECIMAL(10, 2)
);

-- Step 2: Incrementally load new files
COPY INTO landing_orders
FROM 'abfss://[email protected]/new_orders/'
FILEFORMAT = 'PARQUET'
COPY_OPTIONS ('mergeSchema' = 'true');
SQL

The mergeSchema option allows the command to evolve the table schema if new files contain additional columns. While still functional, COPY INTO is now considered a legacy tool in favor of Auto Loader.

Auto Loader: The Modern Standard

Auto Loader is the recommended method for all file-based incremental and streaming ingestion. It’s more scalable and feature-rich than COPY INTO. It can be used programmatically in Spark Structured Streaming or declaratively in DLT.

Its key advantages include:

  • Automatic File Discovery: Efficiently discovers new files in cloud storage without expensive directory listings.
  • Schema Inference and Evolution: Infers the schema from the data and can gracefully handle schema changes over time.
  • Scalability: Built on Spark Structured Streaming for massive scalability.

Declarative Ingestion with DLT

In Delta Live Tables (which are now part of the Lakeflow experience), Auto Loader is abstracted via the read_files table-valued function within a streaming table definition.

-- DLT Pipeline Definition
CREATE OR REFRESH STREAMING TABLE raw_iot_data
AS SELECT * FROM STREAM read_files(
  's3://my-bucket/iot-sensors/',
  format => 'json'
);
SQL

The trigger interval determines the processing mode:

  • Incremental (Batch): A timed trigger (SCHEDULE '1 HOUR') processes new data in micro-batches on a schedule.

Handling Semi-Structured Data

A primary challenge in data ingestion is handling schema-less or semi-structured data like JSON. Lakeflow provides powerful tools for this, centered around the VARIANT data type.

From Raw Strings to Structured Data

Raw JSON data is often ingested as a single string column. The goal is to parse it into a queryable format efficiently.

The Classic Approach: STRUCT

Traditionally, JSON strings were parsed into a strongly-typed STRUCT. This requires defining a schema upfront, often using schema_of_json to infer it from a sample of the data.

-- Parse a JSON string into a struct
SELECT from_json(
  '{"user_id": 1, "device": "mobile"}',
  schema_of_json('{"user_id": 1, "device": "mobile"}')
) AS parsed_data;
SQL

While fast to query, this approach is rigid. If the JSON structure changes, the ingestion job may fail or require manual schema updates.

The Flexible Future: VARIANT

The VARIANT data type is a revolutionary addition for handling semi-structured data.14 It ingests JSON, Avro, or Protobuf data as-is into a highly optimized, compressed binary format. The data is parsed only when a field is accessed at query time, offering immense flexibility. The primary function for this is parse_json.

CREATE OR REPLACE TABLE raw_events (
  event_id STRING,
  payload VARIANT
);

INSERT INTO raw_events
VALUES ('evt-123', parse_json('{"user_id": 101, "action": "login", "details": {"ip": "1.1.1.1"}}'));
SQL

Querying VARIANT Data

Querying VARIANT data is intuitive and powerful.

  • : is used for key navigation (path extraction).
  • :: is used for casting the extracted value to a specific data type.
SELECT
  payload:user_id::INT,
  payload:action::STRING,
  payload:details:ip::STRING AS ip_address
FROM raw_events;
SQL

JSON vs. STRUCT vs. VARIANT

FeatureJSON (String)STRUCTVARIANT
Storage FormatRaw TextColumnar (Parsed)Optimized Binary
Schema Enforcement❌ (None)✅ (Strict, on write)✅ (Flexible, on read)
Query PerformanceSlowFastVery Fast
Schema EvolutionManualRequires table ALTERAutomatic
Ease of Ingestion✅ (Simple)❌ (Requires schema)✅ (Simple)

Refining Your Selections

Excluding Columns with EXCEPT

When working with wide tables, especially after expanding a STRUCT or VARIANT, the SELECT * EXCEPT(...) clause is invaluable for removing unneeded columns, such as the original raw string column after it has been successfully parsed.

-- Create a clean table without the original payload column
CREATE TABLE clean_events AS
SELECT
  event_id,
  payload:user_id::INT AS user_id,
  payload:action::STRING AS action
FROM raw_events;
SQL

Enhancing and Managing Ingested Data

Raw data often needs to be enriched with metadata and stored in a managed location before being transformed.

Enriching Data at the Source

Capturing File Metadata

When using Auto Loader (read_files), an implicit _metadata column is available. This allows you to capture valuable context about the source file, such as its name, size, and modification timestamp, directly into your target table.

CREATE OR REFRESH STREAMING TABLE landing_zone_files
AS SELECT
  *,
  _metadata.file_path,
  _metadata.file_modification_time
FROM STREAM read_files('s3://my-bucket/source/');
SQL

Unity Catalog Volumes: The Staging Ground

Unity Catalog Volumes are a secure, governed location within the metastore for storing and accessing non-tabular data, like raw files. They serve as the ideal staging area for data ingestion pipelines, especially for Managed Connectors, providing a unified governance model for both the data at rest and the final Delta tables.

The Lakeflow Unification

Lakeflow is not a complete replacement of existing tools but a unification and rebranding that streamlines the user experience.

From DLT and Jobs to Lakeflow

The powerful engines you already use are now presented under the Lakeflow umbrella to emphasize their role in the end-to-end data flow.

  • Delta Live Tables are now referred to as Lakeflow declarative pipelines, highlighting their declarative nature for building reliable data pipelines.
  • Databricks Jobs are now Lakeflow jobs, positioning them as the orchestration layer for all data tasks, including Lakeflow pipelines, notebooks, and dbt models.

Maintaining Data Quality and Performance

Upserts with MERGE INTO

The MERGE INTO command remains the cornerstone of change data capture (CDC) and data warehousing workloads. It allows you to efficiently perform INSERT, UPDATE, and DELETE operations on a Delta table based on a set of changes from a source table. Combined with WITH SCHEMA EVOLUTION, it can handle both data and schema changes seamlessly.

Table Maintenance: OPTIMIZE and Z-ORDER

Performance of a Delta table depends on its physical layout.

  • OPTIMIZE: This command compacts small files into larger, more optimal ones. This is crucial for read performance, as opening many small files is inefficient.
  • Z-ORDER: This is a technique used with OPTIMIZE that colocates related information in the same set of files. By clustering data on frequently filtered columns (Z-ORDER BY (colA, colB)), queries that filter by those columns can skip massive amounts of data, leading to significant performance gains.

LEAVE A REPLY

Please enter your comment!
Please enter your name here

This site uses Akismet to reduce spam. Learn how your comment data is processed.