Table of Contents
From Raw Files to Reliable Tables: A Guide to Data Ingestion
Databricks provides powerful and flexible ways to work with data where it lives, allowing you to query files directly from cloud storage. This chapter explores the methods for querying files, defining persistent pointers to external data, and contrasts the different patterns for ingesting that data into reliable, performant Delta Tables.
Ad-hoc File Querying
The most direct way to explore data is by querying files ad-hoc using a special SQL syntax. This is excellent for initial data discovery. When you run a query this way, Spark scans the files in the directory at that moment, so you are always querying the latest set of files present.
Directly Querying Files with file_format.\
path“
You can query files directly by specifying their format (json
, csv
, etc.) followed by the path enclosed in backticks. Spark reads the data at that location and infers the schema on the fly.
SQL Example:
SELECT * FROM json.`/mnt/landing/raw_events/`;
SQLPython Alternative:
display(spark.read.format("json").load("/mnt/landing/raw_events/"))
PythonSupported Formats
This syntax supports all standard file formats that Spark can read, including json
, csv
, parquet
, orc
, avro
, and text
.
Inspecting File Provenance with input_file_name()
When querying a directory of files, it’s often useful to know which source file a specific record came from. The input_file_name()
function can be included in your SELECT
statement to return the full path of the source file for each row.
SQL Example:
SELECT
*,
input_file_name() AS source_file
FROM json.`/mnt/landing/raw_events/`;
SQLPython Alternative:
from pyspark.sql.functions import input_file_name
df = spark.read.format("json").load("/mnt/landing/raw_events/")
df_with_source = df.withColumn("source_file", input_file_name())
display(df_with_source)
PythonPreviewing File Contents
Before writing a full query, you can get a quick look at a file’s content using dbutils
or magic commands in a notebook.
Example:
# Using dbutils in Python to see the first 64KB of a file
print(dbutils.fs.head("/mnt/landing/raw_events/some_file.json"))
PythonCreating Pointers to External Data
For more regular use, instead of querying paths directly, you can create a permanent table in the metastore that points to an external location.
External Tables on Non-Delta Files
Using CREATE TABLE ... USING file_format
creates a non-Delta table, which is essentially a metadata pointer to raw files. This table lacks the transactional benefits of Delta Lake. To see new files added to the source directory, you must manually run REFRESH TABLE table_name;
before your query to force Spark to re-scan the directory. On very large datasets, this refresh operation can be time-consuming.
SQL Example:
CREATE TABLE my_raw_csv_table (id INT, name STRING)
USING CSV
OPTIONS (header = "true")
LOCATION '/mnt/raw_data/csv_files/';
-- You must run this command before querying to see new files
REFRESH TABLE my_raw_csv_table;
SQLThe Robust Ingestion Pattern: View -> CTAS
The most robust data engineering pattern involves ingesting data from external files into reliable Delta Tables. This is especially important for schema-less sources like CSV or JSON where schema inference can be unreliable.
The Temporary View -> CREATE TABLE AS SELECT (CTAS)
pattern solves this by separating file parsing from data conversion.
- Step 1: The Temporary View (The “Reader”): The view’s only job is to correctly read and parse the raw source files using the
OPTIONS
clause to handle headers, delimiters, and schema inference. - Step 2: The CTAS (The “Writer & Converter”): The
CTAS
statement then reads from this well-defined temporary view, performs any transformations (like casting data types), and converts the data into the Delta format (Parquet files + transaction log).
Basic Example: Ingesting a Single CSV File
SQL Example:
CREATE TEMPORARY VIEW raw_customers_view
USING CSV
OPTIONS (path = '/mnt/landing/customers.csv', header = 'true', inferSchema = 'true');
CREATE OR REPLACE TABLE bronze_customers
AS SELECT CAST(customer_id AS INT), CAST(signup_date AS DATE), trim(email) AS email
FROM raw_customers_view;
SQLPython Alternative:
from pyspark.sql.functions import col, trim
from pyspark.sql.types import IntegerType, DateType
raw_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/landing/customers.csv")
clean_df = raw_df.select(col("customer_id").cast(IntegerType()), col("signup_date").cast(DateType()), trim(col("email")).alias("email"))
clean_df.write.format("delta").mode("overwrite").saveAsTable("bronze_customers")
PythonThe Full Advantage: Why Convert to Delta Tables?
If the View -> CTAS
pattern creates a point-in-time snapshot, what is its advantage over just using a non-Delta external table and REFRESH
? The answer lies in both reliability and a massive performance improvement.
Beyond Transactions: The Performance Advantage
When the CTAS
process creates a Delta Table, it is fundamentally transforming the data for analytics.
- Format Conversion: It converts inefficient formats like CSV or JSON into the highly efficient, columnar Parquet format.
- File Compaction: The process can solve the “small file problem” by compacting thousands of small source files into fewer, larger files.
- Data Skipping via Statistics: Delta Lake automatically collects statistics (min/max values) on your data. The query engine uses these statistics to skip reading files that could not possibly contain relevant data, drastically reducing query times.
Data Freshness: Batch vs. Streaming Patterns
The Batch Ingestion Pattern and Its Freshness
The View -> CTAS
pattern is for batch processing. The resulting Delta table is a point-in-time snapshot and must be re-run on a schedule (e.g., hourly or daily) to ingest new data.
The Pattern for Near Real-Time Freshness
To guarantee a table has the latest data, a different pattern is needed: using Auto Loader with Delta Live Tables (DLT).
Auto Loader efficiently processes new data files as they arrive in cloud storage, and DLT is a framework that simplifies building the pipeline. This pattern creates a continuous flow where the target Delta table is updated within seconds or minutes of a new file arriving.
This continuous flow can be visualized as follows:
(Source Directory) (Continuous DLT Pipeline) (Permanent Destination)
+-----------------+ +-------------------------+ +--------------------+
| New file arrives| ---> | Auto Loader | | |
| /mnt/landing/.. | | (Detects new file) | -----> | Bronze Delta Table |
|(e.g.,file3.json)| | | | (Data is appended) |
+-----------------+ +-------------------------+ | (Always up-to-date)|
+--------------------+
PythonDLT Python Example:
import dlt
@dlt.table
def raw_events():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/mnt/landing/raw_events/")
)
PythonWriting to Delta Tables: A Comparison of DML Commands
Once a Delta table is created, the next step in any data engineering pipeline is to write data to it. This can involve initial loading, appending new records, completely overwriting the data, or performing complex updates. Delta Lake provides a rich set of DML (Data Manipulation Language) and DDL (Data Definition Language) commands for these tasks.
Command Comparison Overview
This table provides a high-level summary of the primary commands used to write data to tables in Databricks.
Command | Action | Creates Table? | Handles Updates? | Handles Deletes? | Common Use Case |
CREATE TABLE AS SELECT | Creates & Populates | ✅ | ❌ | ❌ | Initial, one-time creation of a table. |
INSERT INTO | Appends Data | ❌ | ❌ | ❌ | Adding new records without changing existing ones. |
INSERT OVERWRITE | Replaces All Data | ❌ | ❌ | ❌ | Completely refreshing a table with a new dataset. |
MERGE INTO | Upserts Data | ❌ | ✅ | ✅ | Incrementally updating a table with new data (updates, inserts, deletes). |
Detailed Command Explanations
CREATE TABLE AS SELECT
(CTAS)
As covered previously, CTAS creates a new table and populates it with the results of a SELECT
query in a single step. It is used for the initial creation of a table.
SQL Example:
CREATE TABLE sales_summary AS SELECT sale_date, SUM(amount) AS total_sales FROM raw_sales GROUP BY sale_date;
SQLPython Alternative:
(spark.table("raw_sales")
.groupBy("sale_date")
.sum("amount")
.withColumnRenamed("sum(amount)", "total_sales")
.write.format("delta")
.saveAsTable("sales_summary"))
PythonINSERT INTO
This command appends new rows to an existing table. It does not modify or delete any existing data.
SQL Example:
INSERT INTO sales_summary SELECT '2025-07-11', 500.00;
SQLPython Alternative:
from pyspark.sql import Row
new_sales_df = spark.createDataFrame([Row(sale_date='2025-07-11', total_sales=500.00)])
new_sales_df.write.format("delta").mode("append").saveAsTable("sales_summary")
PythonINSERT OVERWRITE
This command deletes all existing data in the table and replaces it with the new data from the SELECT
query. The table schema is not changed.
SQL Example:
-- This will replace all existing data in sales_summary
INSERT OVERWRITE sales_summary
SELECT sale_date, SUM(amount) AS total_sales FROM new_raw_sales GROUP BY sale_date;
SQLPython Alternative:
(spark.table("new_raw_sales")
.groupBy("sale_date")
.sum("amount")
.withColumnRenamed("sum(amount)", "total_sales")
.write.format("delta")
.mode("overwrite")
.option("overwriteSchema", "false") -- Important: only overwrite data, not schema
.saveAsTable("sales_summary"))
PythonMERGE INTO
(The Upsert)
This is the most powerful command for data manipulation in Delta Lake. It allows you to perform inserts, updates, and deletes all in a single atomic operation based on a join between a source and a target table.
Refreshing Data: INSERT OVERWRITE
vs. MERGE INTO
When refreshing a table using a pattern like Temporary View -> Delta Table
, the choice of command depends on the use case.
- Use
INSERT OVERWRITE
for a Full Refresh: This approach is simple and effective when you want to completely replace the target table with the latest data from the source. It’s suitable for smaller dimension tables or when the source data doesn’t provide clear information about what has changed. - Use
MERGE INTO
for an Incremental Refresh: This is the more efficient and sophisticated approach. It’s used when you have a set of changes (new records and updated records) and you want to apply just those changes to the target table without rewriting the entire dataset. This is the standard for handling Change Data Capture (CDC) feeds.
Anatomy of the MERGE
Statement
The MERGE
statement has three key parts that work together.
MERGE INTO target_table USING source_of_updates
: This defines the destination table and the source of the changes.ON target.key = source.key
: This is the join condition that determines if a row in the source matches a row in the target.WHEN MATCHED / WHEN NOT MATCHED
clauses: These define the actions to take based on the join condition.
WHEN MATCHED THEN UPDATE SET ...
: Specifies how to update the columns in the target table when a match is found.WHEN MATCHED AND <condition> THEN DELETE
: Deletes the row from the target table if a match is found and an additional condition is met.WHEN NOT MATCHED THEN INSERT ...
: Inserts a new row into the target table when a row exists in the source but not in the target.
SQL Example:
-- Assume 'daily_updates' contains new and updated sales records
MERGE INTO sales_summary AS target
USING daily_updates AS source
ON target.sale_date = source.sale_date
WHEN MATCHED THEN
UPDATE SET target.total_sales = source.new_total_sales
WHEN NOT MATCHED THEN
INSERT (sale_date, total_sales) VALUES (source.sale_date, source.new_total_sales)
SQLPython Alternative:
from delta.tables import DeltaTable
# Load the target Delta table
delta_table = DeltaTable.forPath(spark, "/path/to/sales_summary")
# Assume 'daily_updates_df' is a DataFrame with the new data
delta_table.alias("target").merge(
source=daily_updates_df.alias("source"),
condition="target.sale_date = source.sale_date"
).whenMatchedUpdate(
set={"total_sales": "source.new_total_sales"}
).whenNotMatchedInsert(
values={"sale_date": "source.sale_date", "total_sales": "source.new_total_sales"}
).execute()
PythonWorking with Complex Data: Structs, Arrays, and Relational Operations
Modern data engineering frequently involves working with complex, semi-structured data. Databricks and Spark SQL provide a rich set of functions and syntax to natively handle nested data structures like structs, arrays, and maps, and to perform powerful relational operations.
Querying Nested Data
Nested data often arrives as JSON strings which must be parsed into structured types for efficient querying.
Dot Syntax for STRUCT Types vs. Colon Syntax for JSON Strings
It is essential to distinguish between a STRUCT
type and a string column containing JSON text.
- The dot syntax (
struct_col.field
) is the standard way to access fields within aSTRUCT
column. This is a strongly-typed, performant operation. - The colon syntax (
json_string_col:field
) is a special operator used only for extracting fields directly from a string column that contains valid JSON. This is less performant and provides no type safety. It is primarily supported for JSON string formats.
Converting JSON Strings to Structs
For reliable and performant querying, you should always convert JSON strings into STRUCT
columns. This is typically a two-step process in SQL. Other structured formats like Avro are often read directly into structs.
- Extract the Schema: Use the
schema_of_json()
function to infer the schema from a sample of the JSON data. - Apply the Schema: Use the
from_json()
function along with the extracted schema to parse the JSON string into aSTRUCT
column.
SQL Example:
-- Assume 'raw_events' has a column 'json_payload' which is a STRING
SELECT from_json(json_payload, schema_of_json(FIRST(json_payload))) AS payload_struct
FROM raw_events;
SQLPython Alternative:
from pyspark.sql.functions import from_json, schema_of_json, first
# First, define the schema (often done explicitly in Python for robustness)
json_schema = spark.read.json(raw_events.select("json_payload").rdd.map(lambda r: r.json_payload)).schema
# Apply the schema to the JSON string column
parsed_df = raw_events.withColumn("payload_struct", from_json("json_payload", json_schema))
display(parsed_df)
PythonFlattening Struct Fields
Once you have a STRUCT
, you can easily “flatten” its fields into top-level columns using the .*
syntax.
SQL Example:
-- Assume 'payload_struct' has fields 'event_id' and 'user_id'
SELECT payload_struct.* FROM parsed_table;
SQLPython Alternative:
display(parsed_df.select("payload_struct.*"))
PythonWorking with Arrays
Spark SQL provides many built-in functions for manipulating ARRAY
type columns.
explode()
The explode()
function takes an array and creates a new row for each element in that array. This is used to un-nest array data into a flat, relational format.
SQL Example:
-- If a row has an array [1, 2], it becomes two rows
SELECT order_id, explode(items) AS item_id FROM orders;
SQLPython Alternative:
from pyspark.sql.functions import explode
display(orders_df.select("order_id", explode("items").alias("item_id")))
Pythoncollect_set()
and array_distinct()
collect_set()
is an aggregate function that gathers all unique values from a group into an array.array_distinct()
is a regular function that removes duplicates from an existing array column.
SQL Example:
-- Get a unique list of products for each order
SELECT order_id, collect_set(product_id) AS unique_products FROM order_items GROUP BY order_id;
-- Clean an existing array of tags
SELECT array_distinct(tags_array) FROM articles;
SQLPython Alternative:
from pyspark.sql.functions import collect_set, array_distinct
display(order_items_df.groupBy("order_id").agg(collect_set("product_id").alias("unique_products")))
display(articles_df.withColumn("distinct_tags", array_distinct("tags_array")))
PythonRelational Set Operations
Join Operations
Joins are used to combine columns from two or more tables horizontally based on a related key. Databricks supports all standard join types: INNER
, LEFT OUTER
, RIGHT OUTER
, and FULL OUTER
.
SQL Example:
SELECT o.order_id, c.customer_name
FROM orders o
INNER JOIN customers c ON o.customer_id = c.customer_id;
SQLPython Alternative:
display(orders_df.join(customers_df, orders_df.customer_id == customers_df.customer_id, "inner"))
PythonUNION
, INTERSECT
, and MINUS
(EXCEPT
)
These operators combine rows from two result sets vertically. The queries must have the same number and type of columns.
UNION
/UNION ALL
: Appends rows from one result set to another.UNION
removes duplicates whileUNION ALL
includes them.INTERSECT
: Returns only the rows that exist in both result sets.EXCEPT
(calledMINUS
in some SQL dialects): Returns the rows from the first result set that are not present in the second.
SQL Example:
-- Combine sales from two regions
SELECT * FROM sales_emea
UNION ALL
SELECT * FROM sales_apac;
-- Find customers who are not employees
SELECT id FROM customers
EXCEPT
SELECT id FROM employees;
SQLThe PIVOT
Operation
A PIVOT
operation transforms a table by turning unique values from one column into multiple new columns, aggregating data in the process. It turns a “long” data format into a “wide” one.
SQL Example:
Imagine a sales table with columns (month, category, amount). We can pivot it to see sales for each category per month.
SELECT * FROM (
SELECT month, category, amount FROM monthly_sales
)
PIVOT (
SUM(amount) FOR month IN ('Jan', 'Feb', 'Mar')
);
SQLPython Alternative:
display(monthly_sales_df
.groupBy("category")
.pivot("month", ["Jan", "Feb", "Mar"])
.sum("amount")
)
PythonAdvanced Querying: Higher-Order Functions, UDFs, and Spark Execution
This chapter explores advanced querying techniques in Databricks, focusing on functions that manipulate complex data types like arrays, the creation of custom logic with User-Defined Functions (UDFs), and the fundamental principles of how Spark executes queries in parallel.
Higher-Order Functions for Complex Types
Higher-order functions are powerful SQL constructs that operate on complex data types like arrays and maps. They take lambda functions as arguments, allowing you to process each element within an array directly in your SQL query.
TRANSFORM
Function
The TRANSFORM
function applies a specified function to each element in an array and returns a new array of the same size containing the transformed elements. This is conceptually identical to the map()
function found in many programming languages.
SQL Example:
Given a table items with an integer array column values, this query creates a new array where each value is squared.
SELECT
values,
TRANSFORM(values, value -> value * value) AS squared_values
FROM items;
SQLPython Alternative:
from pyspark.sql.functions import transform
items_df.select(
"values",
transform("values", lambda value: value * value).alias("squared_values")
)
PythonFILTER
Function
The FILTER
function returns a new array containing only the elements from the input array that satisfy the condition of the provided boolean lambda function.
SQL Example:
This query filters the values array to keep only the even numbers.
SELECT
values,
FILTER(values, value -> value % 2 == 0) AS even_values
FROM items;
SQLPython Alternative:
from pyspark.sql.functions import filter
items_df.select(
"values",
filter("values", lambda value: value % 2 == 0).alias("even_values")
)
PythonSubqueries
A subquery, or inner query, is a SELECT
statement nested inside another SQL statement. Subqueries are commonly used in the WHERE
clause to filter data based on the results of another query.
SQL Example:
This query finds all orders placed by customers from a specific region.
SELECT * FROM orders
WHERE customer_id IN (SELECT customer_id FROM customers WHERE region = 'EMEA');
SQLPython Alternative:
In Python, this logic is typically achieved by first collecting the results of the inner query and then using that result to filter the main DataFrame, or more efficiently, by performing a join.
emea_customers = spark.table("customers").filter("region = 'EMEA'").select("customer_id")
orders_df.join(emea_customers, "customer_id", "inner")
PythonExtending Logic with User-Defined Functions (UDFs)
A User-Defined Function (UDF) is a custom function created to encapsulate logic that is not available through Spark’s built-in functions.
Are UDFs Permanent?
The permanence of a UDF depends on how it is registered.
- Temporary UDFs: When you register a UDF in Python or Scala within a notebook, it is temporary and scoped to the current
SparkSession
. It disappears when the session ends and is not visible to other notebooks. - Permanent UDFs: With Unity Catalog, you can use
CREATE FUNCTION
in SQL to register a permanent UDF. This function is stored in the catalog as a database object, can be used across different sessions and notebooks, and has permissions that can be managed like any other object.
SQL Example (Permanent UDF):
CREATE FUNCTION main.utils.get_initials(name STRING)
RETURNS STRING
RETURN substr(split(name, ' ')[0], 1, 1) || substr(split(name, ' ')[1], 1, 1);
SQLPython Example (Temporary UDF):
Python
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Define the Python function
def get_initials_py(name: str) -> str:
parts = name.split(' ')
return parts[0][0] + parts[1][0]
# Register it as a temporary UDF for the current session
get_initials_udf = udf(get_initials_py, StringType())
spark.udf.register("get_initials_temp", get_initials_udf)
PythonInspecting Functions
To view the metadata for both built-in and user-defined functions, you can use the DESCRIBE FUNCTION
command.
DESCRIBE FUNCTION
: Shows basic information like the function name, input parameters, and return type.DESCRIBE FUNCTION EXTENDED
: Provides more detailed information, including a description of what the function does and usage examples.
SQL Example:
DESCRIBE FUNCTION EXTENDED filter;
SQLUnderstanding Spark’s Parallel Execution Model
Apache Spark’s high performance comes from its ability to execute data processing tasks in parallel across a cluster of computers. Understanding this distributed model is fundamental to writing efficient and scalable data engineering code in Databricks.
The Core Components
A Spark application running on a cluster consists of three main components that work together.
The Driver Node
The Driver is the “brain” or orchestrator of the Spark application. When you run a notebook or submit a job, the driver process is responsible for:
- Hosting the
SparkSession
, which is the entry point to Spark. - Analyzing your code (e.g., your DataFrame transformations and actions).
- Creating a logical and physical execution plan, often visualized as a Directed Acyclic Graph (DAG) of tasks.
- Requesting resources from the cluster manager.
- Sending tasks to the worker nodes for execution and collecting the results.
The Worker Nodes and Executors
The Workers are the “muscles” of the operation. These are the individual machines in the cluster that perform the actual computations. Each worker node runs one or more executor processes. An executor is a Java Virtual Machine (JVM) process that has a set of CPU cores and a slice of memory allocated to it. Its job is to receive tasks from the driver, execute them on its assigned data, and store results in memory or on disk.
Data as Partitions
Spark does not view data as a single, monolithic file. Instead, it breaks DataFrames down into smaller logical chunks called partitions. Each partition is a subset of the total rows. This division is what enables parallelism, as different executors can work on different partitions of the same DataFrame simultaneously.
The Execution Flow: From Code to Result
The interaction between these components follows a clear flow, which allows for massive parallelism.
- Code Submission: Your code is sent to the Driver node.
- Planning: The Driver analyzes the code and creates an optimized plan (a DAG) of tasks to be executed.
- Task Distribution: The Driver sends these tasks to the available executors on the worker nodes. Each task is typically designed to process one partition of data.
- Parallel Execution: The executors on all worker nodes process their assigned partitions in parallel.
- Result Aggregation: The results from each executor are sent back to the Driver, which combines them to produce the final result.
This flow can be visualized as follows:
+----------------+
| Driver Node |
| (Your Code) |
| (SparkSession) |
| (Query Plan) |
+----------------+
| ^
(Sends Tasks) | | (Returns Results)
V V V V V V V V V V V V V V V V V V V V V V V V V V V V V V
| | |
+-------------------+ +-------------------+ +-------------------+
| Worker Node 1 | | Worker Node 2 | | Worker Node 3 |
| +---------------+ | | +---------------+ | | +---------------+ |
| | Executor | | | | Executor | | | | Executor | |
| | (Task on P_1) | | | | (Task on P_2) | | | | (Task on P_3) | |
| +---------------+ | | +---------------+ | | +---------------+ |
+-------------------+ +-------------------+ +-------------------+
| | |
[Partition 1] [Partition 2] [Partition 3]
(Data Chunks from your source DataFrame)
PythonTransformations and Actions: The “Lazy” Execution Model
Spark’s ability to create an optimized plan is enabled by its lazy execution model, which differentiates between two types of operations.
Transformations
Transformations are operations that create a new DataFrame from an existing one. Examples include select()
, filter()
, groupBy()
, and join()
.
Transformations are lazy, meaning Spark does not execute them immediately. When you call a transformation, Spark simply adds it to its logical plan (the DAG) of what needs to be done.
Python Example:
# LAZY: Nothing is executed yet, Spark is just building a plan.
source_df = spark.read.table("sales")
filtered_df = source_df.filter("amount > 100")
aggregated_df = filtered_df.groupBy("category").count()
PythonActions
Actions are operations that trigger the actual computation. Examples include show()
, count()
, collect()
, and writing data with saveAsTable()
.
When an action is called, Spark reviews the entire DAG of transformations it has built, optimizes it, and then submits it as a job to the cluster for parallel execution.
Python Example:
# ACTION: This command triggers the read, filter, and groupBy operations
# to run in parallel across the cluster.
aggregated_df.show()
PythonThis lazy model is critical for performance because it allows Spark to see the entire workflow from start to finish before executing anything, enabling it to perform powerful optimizations before distributing the work.