Table of Contents

From Raw Files to Reliable Tables: A Guide to Data Ingestion

Databricks provides powerful and flexible ways to work with data where it lives, allowing you to query files directly from cloud storage. This chapter explores the methods for querying files, defining persistent pointers to external data, and contrasts the different patterns for ingesting that data into reliable, performant Delta Tables.

Ad-hoc File Querying

The most direct way to explore data is by querying files ad-hoc using a special SQL syntax. This is excellent for initial data discovery. When you run a query this way, Spark scans the files in the directory at that moment, so you are always querying the latest set of files present.

Directly Querying Files with file_format.\path“

You can query files directly by specifying their format (json, csv, etc.) followed by the path enclosed in backticks. Spark reads the data at that location and infers the schema on the fly.

SQL Example:

SELECT * FROM json.`/mnt/landing/raw_events/`;
SQL

Python Alternative:

display(spark.read.format("json").load("/mnt/landing/raw_events/"))
Python

Supported Formats

This syntax supports all standard file formats that Spark can read, including json, csv, parquet, orc, avro, and text.

Inspecting File Provenance with input_file_name()

When querying a directory of files, it’s often useful to know which source file a specific record came from. The input_file_name() function can be included in your SELECT statement to return the full path of the source file for each row.

SQL Example:

SELECT
  *,
  input_file_name() AS source_file
FROM json.`/mnt/landing/raw_events/`;
SQL

Python Alternative:

from pyspark.sql.functions import input_file_name

df = spark.read.format("json").load("/mnt/landing/raw_events/")
df_with_source = df.withColumn("source_file", input_file_name())
display(df_with_source)
Python

Previewing File Contents

Before writing a full query, you can get a quick look at a file’s content using dbutils or magic commands in a notebook.

Example:

# Using dbutils in Python to see the first 64KB of a file
print(dbutils.fs.head("/mnt/landing/raw_events/some_file.json"))
Python

Creating Pointers to External Data

For more regular use, instead of querying paths directly, you can create a permanent table in the metastore that points to an external location.

External Tables on Non-Delta Files

Using CREATE TABLE ... USING file_format creates a non-Delta table, which is essentially a metadata pointer to raw files. This table lacks the transactional benefits of Delta Lake. To see new files added to the source directory, you must manually run REFRESH TABLE table_name; before your query to force Spark to re-scan the directory. On very large datasets, this refresh operation can be time-consuming.

SQL Example:

CREATE TABLE my_raw_csv_table (id INT, name STRING)
USING CSV
OPTIONS (header = "true")
LOCATION '/mnt/raw_data/csv_files/';

-- You must run this command before querying to see new files
REFRESH TABLE my_raw_csv_table;
SQL

The Robust Ingestion Pattern: View -> CTAS

The most robust data engineering pattern involves ingesting data from external files into reliable Delta Tables. This is especially important for schema-less sources like CSV or JSON where schema inference can be unreliable.

The Temporary View -> CREATE TABLE AS SELECT (CTAS) pattern solves this by separating file parsing from data conversion.

  1. Step 1: The Temporary View (The “Reader”): The view’s only job is to correctly read and parse the raw source files using the OPTIONS clause to handle headers, delimiters, and schema inference.
  2. Step 2: The CTAS (The “Writer & Converter”): The CTAS statement then reads from this well-defined temporary view, performs any transformations (like casting data types), and converts the data into the Delta format (Parquet files + transaction log).

Basic Example: Ingesting a Single CSV File

SQL Example:

CREATE TEMPORARY VIEW raw_customers_view
USING CSV
OPTIONS (path = '/mnt/landing/customers.csv', header = 'true', inferSchema = 'true');

CREATE OR REPLACE TABLE bronze_customers
AS SELECT CAST(customer_id AS INT), CAST(signup_date AS DATE), trim(email) AS email
FROM raw_customers_view;
SQL

Python Alternative:

from pyspark.sql.functions import col, trim
from pyspark.sql.types import IntegerType, DateType

raw_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/landing/customers.csv")

clean_df = raw_df.select(col("customer_id").cast(IntegerType()), col("signup_date").cast(DateType()), trim(col("email")).alias("email"))

clean_df.write.format("delta").mode("overwrite").saveAsTable("bronze_customers")
Python

The Full Advantage: Why Convert to Delta Tables?

If the View -> CTAS pattern creates a point-in-time snapshot, what is its advantage over just using a non-Delta external table and REFRESH? The answer lies in both reliability and a massive performance improvement.

Beyond Transactions: The Performance Advantage

When the CTAS process creates a Delta Table, it is fundamentally transforming the data for analytics.

  • Format Conversion: It converts inefficient formats like CSV or JSON into the highly efficient, columnar Parquet format.
  • File Compaction: The process can solve the “small file problem” by compacting thousands of small source files into fewer, larger files.
  • Data Skipping via Statistics: Delta Lake automatically collects statistics (min/max values) on your data. The query engine uses these statistics to skip reading files that could not possibly contain relevant data, drastically reducing query times.

Data Freshness: Batch vs. Streaming Patterns

The Batch Ingestion Pattern and Its Freshness

The View -> CTAS pattern is for batch processing. The resulting Delta table is a point-in-time snapshot and must be re-run on a schedule (e.g., hourly or daily) to ingest new data.

The Pattern for Near Real-Time Freshness

To guarantee a table has the latest data, a different pattern is needed: using Auto Loader with Delta Live Tables (DLT).

Auto Loader efficiently processes new data files as they arrive in cloud storage, and DLT is a framework that simplifies building the pipeline. This pattern creates a continuous flow where the target Delta table is updated within seconds or minutes of a new file arriving.

This continuous flow can be visualized as follows:

(Source Directory)      (Continuous DLT Pipeline)          (Permanent Destination)
+-----------------+      +-------------------------+        +--------------------+
| New file arrives| ---> |      Auto Loader        |        |                    |
| /mnt/landing/.. |      | (Detects new file)      | -----> | Bronze Delta Table |
|(e.g.,file3.json)|      |                         |        | (Data is appended) |
+-----------------+      +-------------------------+        | (Always up-to-date)|
                                                            +--------------------+
Python

DLT Python Example:

import dlt

@dlt.table
def raw_events():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/mnt/landing/raw_events/")
  )
Python

Writing to Delta Tables: A Comparison of DML Commands

Once a Delta table is created, the next step in any data engineering pipeline is to write data to it. This can involve initial loading, appending new records, completely overwriting the data, or performing complex updates. Delta Lake provides a rich set of DML (Data Manipulation Language) and DDL (Data Definition Language) commands for these tasks.

Command Comparison Overview

This table provides a high-level summary of the primary commands used to write data to tables in Databricks.

CommandActionCreates Table?Handles Updates?Handles Deletes?Common Use Case
CREATE TABLE AS SELECTCreates & PopulatesInitial, one-time creation of a table.
INSERT INTOAppends DataAdding new records without changing existing ones.
INSERT OVERWRITEReplaces All DataCompletely refreshing a table with a new dataset.
MERGE INTOUpserts DataIncrementally updating a table with new data (updates, inserts, deletes).

Detailed Command Explanations

CREATE TABLE AS SELECT (CTAS)

As covered previously, CTAS creates a new table and populates it with the results of a SELECT query in a single step. It is used for the initial creation of a table.

SQL Example:

CREATE TABLE sales_summary AS SELECT sale_date, SUM(amount) AS total_sales FROM raw_sales GROUP BY sale_date;
SQL

Python Alternative:

(spark.table("raw_sales")
  .groupBy("sale_date")
  .sum("amount")
  .withColumnRenamed("sum(amount)", "total_sales")
  .write.format("delta")
  .saveAsTable("sales_summary"))
Python

INSERT INTO

This command appends new rows to an existing table. It does not modify or delete any existing data.

SQL Example:

INSERT INTO sales_summary SELECT '2025-07-11', 500.00;
SQL

Python Alternative:

from pyspark.sql import Row
new_sales_df = spark.createDataFrame([Row(sale_date='2025-07-11', total_sales=500.00)])

new_sales_df.write.format("delta").mode("append").saveAsTable("sales_summary")
Python

INSERT OVERWRITE

This command deletes all existing data in the table and replaces it with the new data from the SELECT query. The table schema is not changed.

SQL Example:

-- This will replace all existing data in sales_summary
INSERT OVERWRITE sales_summary
SELECT sale_date, SUM(amount) AS total_sales FROM new_raw_sales GROUP BY sale_date;
SQL

Python Alternative:

(spark.table("new_raw_sales")
  .groupBy("sale_date")
  .sum("amount")
  .withColumnRenamed("sum(amount)", "total_sales")
  .write.format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "false")  -- Important: only overwrite data, not schema
  .saveAsTable("sales_summary"))
Python

MERGE INTO (The Upsert)

This is the most powerful command for data manipulation in Delta Lake. It allows you to perform inserts, updates, and deletes all in a single atomic operation based on a join between a source and a target table.

Refreshing Data: INSERT OVERWRITE vs. MERGE INTO

When refreshing a table using a pattern like Temporary View -> Delta Table, the choice of command depends on the use case.

  • Use INSERT OVERWRITE for a Full Refresh: This approach is simple and effective when you want to completely replace the target table with the latest data from the source. It’s suitable for smaller dimension tables or when the source data doesn’t provide clear information about what has changed.
  • Use MERGE INTO for an Incremental Refresh: This is the more efficient and sophisticated approach. It’s used when you have a set of changes (new records and updated records) and you want to apply just those changes to the target table without rewriting the entire dataset. This is the standard for handling Change Data Capture (CDC) feeds.

Anatomy of the MERGE Statement

The MERGE statement has three key parts that work together.

  1. MERGE INTO target_table USING source_of_updates: This defines the destination table and the source of the changes.
  2. ON target.key = source.key: This is the join condition that determines if a row in the source matches a row in the target.
  3. WHEN MATCHED / WHEN NOT MATCHED clauses: These define the actions to take based on the join condition.
  • WHEN MATCHED THEN UPDATE SET ...: Specifies how to update the columns in the target table when a match is found.
  • WHEN MATCHED AND <condition> THEN DELETE: Deletes the row from the target table if a match is found and an additional condition is met.
  • WHEN NOT MATCHED THEN INSERT ...: Inserts a new row into the target table when a row exists in the source but not in the target.

SQL Example:

-- Assume 'daily_updates' contains new and updated sales records
MERGE INTO sales_summary AS target
USING daily_updates AS source
ON target.sale_date = source.sale_date
WHEN MATCHED THEN
  UPDATE SET target.total_sales = source.new_total_sales
WHEN NOT MATCHED THEN
  INSERT (sale_date, total_sales) VALUES (source.sale_date, source.new_total_sales)
SQL

Python Alternative:

from delta.tables import DeltaTable

# Load the target Delta table
delta_table = DeltaTable.forPath(spark, "/path/to/sales_summary")
# Assume 'daily_updates_df' is a DataFrame with the new data

delta_table.alias("target").merge(
    source=daily_updates_df.alias("source"),
    condition="target.sale_date = source.sale_date"
  ).whenMatchedUpdate(
    set={"total_sales": "source.new_total_sales"}
  ).whenNotMatchedInsert(
    values={"sale_date": "source.sale_date", "total_sales": "source.new_total_sales"}
  ).execute()
Python

Working with Complex Data: Structs, Arrays, and Relational Operations

Modern data engineering frequently involves working with complex, semi-structured data. Databricks and Spark SQL provide a rich set of functions and syntax to natively handle nested data structures like structs, arrays, and maps, and to perform powerful relational operations.

Querying Nested Data

Nested data often arrives as JSON strings which must be parsed into structured types for efficient querying.

Dot Syntax for STRUCT Types vs. Colon Syntax for JSON Strings

It is essential to distinguish between a STRUCT type and a string column containing JSON text.

  • The dot syntax (struct_col.field) is the standard way to access fields within a STRUCT column. This is a strongly-typed, performant operation.
  • The colon syntax (json_string_col:field) is a special operator used only for extracting fields directly from a string column that contains valid JSON. This is less performant and provides no type safety. It is primarily supported for JSON string formats.

Converting JSON Strings to Structs

For reliable and performant querying, you should always convert JSON strings into STRUCT columns. This is typically a two-step process in SQL. Other structured formats like Avro are often read directly into structs.

  1. Extract the Schema: Use the schema_of_json() function to infer the schema from a sample of the JSON data.
  2. Apply the Schema: Use the from_json() function along with the extracted schema to parse the JSON string into a STRUCT column.

SQL Example:

-- Assume 'raw_events' has a column 'json_payload' which is a STRING
SELECT from_json(json_payload, schema_of_json(FIRST(json_payload))) AS payload_struct
FROM raw_events;
SQL

Python Alternative:

from pyspark.sql.functions import from_json, schema_of_json, first

# First, define the schema (often done explicitly in Python for robustness)
json_schema = spark.read.json(raw_events.select("json_payload").rdd.map(lambda r: r.json_payload)).schema

# Apply the schema to the JSON string column
parsed_df = raw_events.withColumn("payload_struct", from_json("json_payload", json_schema))
display(parsed_df)
Python

Flattening Struct Fields

Once you have a STRUCT, you can easily “flatten” its fields into top-level columns using the .* syntax.

SQL Example:

-- Assume 'payload_struct' has fields 'event_id' and 'user_id'
SELECT payload_struct.* FROM parsed_table;
SQL

Python Alternative:

display(parsed_df.select("payload_struct.*"))
Python

Working with Arrays

Spark SQL provides many built-in functions for manipulating ARRAY type columns.

explode()

The explode() function takes an array and creates a new row for each element in that array. This is used to un-nest array data into a flat, relational format.

SQL Example:

-- If a row has an array [1, 2], it becomes two rows
SELECT order_id, explode(items) AS item_id FROM orders;
SQL

Python Alternative:

from pyspark.sql.functions import explode

display(orders_df.select("order_id", explode("items").alias("item_id")))
Python

collect_set() and array_distinct()

  • collect_set() is an aggregate function that gathers all unique values from a group into an array.
  • array_distinct() is a regular function that removes duplicates from an existing array column.

SQL Example:

-- Get a unique list of products for each order
SELECT order_id, collect_set(product_id) AS unique_products FROM order_items GROUP BY order_id;

-- Clean an existing array of tags
SELECT array_distinct(tags_array) FROM articles;
SQL

Python Alternative:

from pyspark.sql.functions import collect_set, array_distinct

display(order_items_df.groupBy("order_id").agg(collect_set("product_id").alias("unique_products")))

display(articles_df.withColumn("distinct_tags", array_distinct("tags_array")))
Python

Relational Set Operations

Join Operations

Joins are used to combine columns from two or more tables horizontally based on a related key. Databricks supports all standard join types: INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER.

SQL Example:

SELECT o.order_id, c.customer_name
FROM orders o
INNER JOIN customers c ON o.customer_id = c.customer_id;
SQL

Python Alternative:

display(orders_df.join(customers_df, orders_df.customer_id == customers_df.customer_id, "inner"))
Python

UNION, INTERSECT, and MINUS (EXCEPT)

These operators combine rows from two result sets vertically. The queries must have the same number and type of columns.

  • UNION / UNION ALL: Appends rows from one result set to another. UNION removes duplicates while UNION ALL includes them.
  • INTERSECT: Returns only the rows that exist in both result sets.
  • EXCEPT (called MINUS in some SQL dialects): Returns the rows from the first result set that are not present in the second.

SQL Example:

-- Combine sales from two regions
SELECT * FROM sales_emea
UNION ALL
SELECT * FROM sales_apac;

-- Find customers who are not employees
SELECT id FROM customers
EXCEPT
SELECT id FROM employees;
SQL

The PIVOT Operation

A PIVOT operation transforms a table by turning unique values from one column into multiple new columns, aggregating data in the process. It turns a “long” data format into a “wide” one.

SQL Example:

Imagine a sales table with columns (month, category, amount). We can pivot it to see sales for each category per month.

SELECT * FROM (
  SELECT month, category, amount FROM monthly_sales
)
PIVOT (
  SUM(amount) FOR month IN ('Jan', 'Feb', 'Mar')
);
SQL

Python Alternative:

display(monthly_sales_df
  .groupBy("category")
  .pivot("month", ["Jan", "Feb", "Mar"])
  .sum("amount")
)
Python

Advanced Querying: Higher-Order Functions, UDFs, and Spark Execution

This chapter explores advanced querying techniques in Databricks, focusing on functions that manipulate complex data types like arrays, the creation of custom logic with User-Defined Functions (UDFs), and the fundamental principles of how Spark executes queries in parallel.

Higher-Order Functions for Complex Types

Higher-order functions are powerful SQL constructs that operate on complex data types like arrays and maps. They take lambda functions as arguments, allowing you to process each element within an array directly in your SQL query.

TRANSFORM Function

The TRANSFORM function applies a specified function to each element in an array and returns a new array of the same size containing the transformed elements. This is conceptually identical to the map() function found in many programming languages.

SQL Example:

Given a table items with an integer array column values, this query creates a new array where each value is squared.

SELECT
  values,
  TRANSFORM(values, value -> value * value) AS squared_values
FROM items;
SQL

Python Alternative:

from pyspark.sql.functions import transform

items_df.select(
  "values",
  transform("values", lambda value: value * value).alias("squared_values")
)
Python

FILTER Function

The FILTER function returns a new array containing only the elements from the input array that satisfy the condition of the provided boolean lambda function.

SQL Example:

This query filters the values array to keep only the even numbers.

SELECT
  values,
  FILTER(values, value -> value % 2 == 0) AS even_values
FROM items;
SQL

Python Alternative:

from pyspark.sql.functions import filter

items_df.select(
  "values",
  filter("values", lambda value: value % 2 == 0).alias("even_values")
)
Python

Subqueries

A subquery, or inner query, is a SELECT statement nested inside another SQL statement. Subqueries are commonly used in the WHERE clause to filter data based on the results of another query.

SQL Example:

This query finds all orders placed by customers from a specific region.

SELECT * FROM orders
WHERE customer_id IN (SELECT customer_id FROM customers WHERE region = 'EMEA');
SQL

Python Alternative:

In Python, this logic is typically achieved by first collecting the results of the inner query and then using that result to filter the main DataFrame, or more efficiently, by performing a join.

emea_customers = spark.table("customers").filter("region = 'EMEA'").select("customer_id")
orders_df.join(emea_customers, "customer_id", "inner")
Python

Extending Logic with User-Defined Functions (UDFs)

A User-Defined Function (UDF) is a custom function created to encapsulate logic that is not available through Spark’s built-in functions.

Are UDFs Permanent?

The permanence of a UDF depends on how it is registered.

  • Temporary UDFs: When you register a UDF in Python or Scala within a notebook, it is temporary and scoped to the current SparkSession. It disappears when the session ends and is not visible to other notebooks.
  • Permanent UDFs: With Unity Catalog, you can use CREATE FUNCTION in SQL to register a permanent UDF. This function is stored in the catalog as a database object, can be used across different sessions and notebooks, and has permissions that can be managed like any other object.

SQL Example (Permanent UDF):

CREATE FUNCTION main.utils.get_initials(name STRING)
RETURNS STRING
RETURN substr(split(name, ' ')[0], 1, 1) || substr(split(name, ' ')[1], 1, 1);
SQL

Python Example (Temporary UDF):

Python

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the Python function
def get_initials_py(name: str) -> str:
    parts = name.split(' ')
    return parts[0][0] + parts[1][0]

# Register it as a temporary UDF for the current session
get_initials_udf = udf(get_initials_py, StringType())
spark.udf.register("get_initials_temp", get_initials_udf)
Python

Inspecting Functions

To view the metadata for both built-in and user-defined functions, you can use the DESCRIBE FUNCTION command.

  • DESCRIBE FUNCTION: Shows basic information like the function name, input parameters, and return type.
  • DESCRIBE FUNCTION EXTENDED: Provides more detailed information, including a description of what the function does and usage examples.

SQL Example:

DESCRIBE FUNCTION EXTENDED filter;
SQL

Understanding Spark’s Parallel Execution Model

Apache Spark’s high performance comes from its ability to execute data processing tasks in parallel across a cluster of computers. Understanding this distributed model is fundamental to writing efficient and scalable data engineering code in Databricks.

The Core Components

A Spark application running on a cluster consists of three main components that work together.

The Driver Node

The Driver is the “brain” or orchestrator of the Spark application. When you run a notebook or submit a job, the driver process is responsible for:

  • Hosting the SparkSession, which is the entry point to Spark.
  • Analyzing your code (e.g., your DataFrame transformations and actions).
  • Creating a logical and physical execution plan, often visualized as a Directed Acyclic Graph (DAG) of tasks.
  • Requesting resources from the cluster manager.
  • Sending tasks to the worker nodes for execution and collecting the results.

The Worker Nodes and Executors

The Workers are the “muscles” of the operation. These are the individual machines in the cluster that perform the actual computations. Each worker node runs one or more executor processes. An executor is a Java Virtual Machine (JVM) process that has a set of CPU cores and a slice of memory allocated to it. Its job is to receive tasks from the driver, execute them on its assigned data, and store results in memory or on disk.

Data as Partitions

Spark does not view data as a single, monolithic file. Instead, it breaks DataFrames down into smaller logical chunks called partitions. Each partition is a subset of the total rows. This division is what enables parallelism, as different executors can work on different partitions of the same DataFrame simultaneously.

The Execution Flow: From Code to Result

The interaction between these components follows a clear flow, which allows for massive parallelism.

  1. Code Submission: Your code is sent to the Driver node.
  2. Planning: The Driver analyzes the code and creates an optimized plan (a DAG) of tasks to be executed.
  3. Task Distribution: The Driver sends these tasks to the available executors on the worker nodes. Each task is typically designed to process one partition of data.
  4. Parallel Execution: The executors on all worker nodes process their assigned partitions in parallel.
  5. Result Aggregation: The results from each executor are sent back to the Driver, which combines them to produce the final result.

This flow can be visualized as follows:

                   +----------------+
                   |  Driver Node   |
                   | (Your Code)    |
                   | (SparkSession) |
                   | (Query Plan)   |
                   +----------------+
                        |         ^
          (Sends Tasks) |         | (Returns Results)
 V V V V V V V V V V V V V V V V V V V V V V V V V V V V V V
          |                 |                 |
+-------------------+ +-------------------+ +-------------------+
|    Worker Node 1  | |    Worker Node 2  | |    Worker Node 3  |
| +---------------+ | | +---------------+ | | +---------------+ |
| |   Executor    | | | |   Executor    | | | |   Executor    | |
| | (Task on P_1) | | | | (Task on P_2) | | | | (Task on P_3) | |
| +---------------+ | | +---------------+ | | +---------------+ |
+-------------------+ +-------------------+ +-------------------+
          |                 |                 |
      [Partition 1]     [Partition 2]     [Partition 3]
       (Data Chunks from your source DataFrame)
Python

Transformations and Actions: The “Lazy” Execution Model

Spark’s ability to create an optimized plan is enabled by its lazy execution model, which differentiates between two types of operations.

Transformations

Transformations are operations that create a new DataFrame from an existing one. Examples include select(), filter(), groupBy(), and join().

Transformations are lazy, meaning Spark does not execute them immediately. When you call a transformation, Spark simply adds it to its logical plan (the DAG) of what needs to be done.

Python Example:

# LAZY: Nothing is executed yet, Spark is just building a plan.
source_df = spark.read.table("sales")
filtered_df = source_df.filter("amount > 100")
aggregated_df = filtered_df.groupBy("category").count()
Python

Actions

Actions are operations that trigger the actual computation. Examples include show(), count(), collect(), and writing data with saveAsTable().

When an action is called, Spark reviews the entire DAG of transformations it has built, optimizes it, and then submits it as a job to the cluster for parallel execution.

Python Example:

# ACTION: This command triggers the read, filter, and groupBy operations
# to run in parallel across the cluster.
aggregated_df.show()
Python

This lazy model is critical for performance because it allows Spark to see the entire workflow from start to finish before executing anything, enabling it to perform powerful optimizations before distributing the work.

LEAVE A REPLY

Please enter your comment!
Please enter your name here

This site uses Akismet to reduce spam. Learn how your comment data is processed.