In the world of data management, we often talk about schema migrations. But what exactly is schema migrations, and why it is so crucial to have a proper plan for data management?
Introduction
Schema migrations, in essence, are the processes by which we evolve our data schema over time. Migrations describe the steps we need to take to modify our database schema while preserving existing data. This could be adding new tables, altering existing tables, or even deleting ones that are no longer needed. In today’s agile world, we have new features coming from business and these usually require also changes in the data structures. Creating a new table or dropping an existing table is the simplest form of schema migration and usually has no significant challenges. Therefore, I will not discuss those in this blogpost. Rather, I will focus on more complex migrations that involve changing the structure of the existing tables. In the past placements, I have seen many teams doing it differently and how difficult it can be to manage data without proper plan or how easy it can go wrong.
The existing schemas have another problem: They have data! And changing the schema means also changing the data, either by reingesting it or by transforming it. If you have done it before you surely know about “potential” data loss or irreversible corruption. This is the reason why schema migrations are so challenging and risky. Not to mention that our applications will use the data, and therefore we must also change the code to support changes in the schema.
TL;DR:
If you are the lucky one, who is not a data engineer, then here is the summary of this blogpost:
new requirements -> change schema -> change data -> change code -> enjoy new features!
If you are a data engineer or person who works with data, then you might be interested in the following sections.
Challenges of Schema Migrations
As with everything in life, there is no “the best” one approach. You will need to understand your teams skills, the data you are working with, the tools your company provides, and the requirements you have. But there are some common challenges that you will face when dealing with schema migrations:
- Data Integrity: If something goes wrong, you might end up losing your data or make them broken and not recoverable. Corrupted or irreversibly broken data might not be always recovered or detected immediately.
- Compatibility: Think about backward compatibility, so that the old code can still work with the new schema. Also think about forward compatibility, so that the new code can also work with the old schema. In addition, think about the way you plan deploy the changes in the schema in distributed systems.
- Downtime: A good plan will make it as short as possible. So that impact on the availability of your services is minimal. Some processes like reingesting data or running complex transformations can take a long time to complete.
- Slow Migrations: Reindexing data or repartitioning takes time and might be slow. How will such migration impact your business? Will it handle inconsistencies in the data?
- Multiple Versions of Schemas: Managing different schema versions across environments (dev, test, prod) can be challenging. You might keep track of schema versions or push the changes from one environment to another. Select your strategy and tools wisely.
- Testing: To prevent data loss and corruption, you need to test your schema migrations thoroughly. Check edge cases, NULL values and extreme values. Test environments usually work with small datasets, that might miss issues that will appear with bigger datasets.
- Tooling: Some ecosystems has developed tooling like Flyway, Liquibase, or Alembic to help with schema migrations. But these tools are usually designed for relational databases and might not work well with dataframes or data lakes.
- Orchestration complexity: Changing the schema might look simple on one system, but not so when you will require distributing the data across multiple systems. This is particularly true when working with distributed data lakes, data warehouses and data consuming microservices.
In the following sections, I want to go deeper into these challenges, get our hands dirty and see how we can solve them. I want to explore some traditional solutions, then introduce Delta Lake - a modern data management solution designed to handle schema migrations for dataframes and data lakes. I will show you some of it features, benefits, and how it can simplify and improve data management, particularly in the context of schema migrations and enforcement. But also that Delta Lake schema evolution is not a silver bullet and has its own limitations, and you still might end up with some manual work.
Creating table
Before we get into the details of schema migrations, let’s first create the original schema that we will try to migrate:
CREATE TABLE spark_catalog.shop.products (
id INT,
product STRING,
contains_nuts STRING,
price STRING
);
INSERT INTO spark_catalog.shop.products VALUES (1, "butter", "no", "2.40");
INSERT INTO spark_catalog.shop.products VALUES (2, "bread", "yes", "1.20");
INSERT INTO spark_catalog.shop.products VALUES (3, "chorizo", "no", "3.60");
SELECT * FROM spark_catalog.shop.products;
The output should look like this:
+---+-------+-------------+-----+
| id|product|contains_nuts|price|
+---+-------+-------------+-----+
| 1| butter| no| 2.40|
| 2| bread| yes| 1.20|
| 3|chorizo| no| 3.60|
+---+-------+-------------+-----+
Now that we have our table with FINAL schema and all microservices are consuming data from it, we receive a fresh requirement from our business. People are buying chorizos, but we have none left in stock. They decide that it would be cute to store also count of each product we have in stock. Let’s see how we can change the schema and migrate the data.
1. The Naive Approach - A Fresh Slate
Many teams might consider a simple, brute-force approach: manually dropping and recreating the schema whenever changes are needed. This approach means that you are essentially starting from scratch every time. After recreating table with new structure, all the data needs to be reingested or transformed.
-- drop the table
DROP TABLE IF EXISTS spark_catalog.shop.products
-- define new schema
CREATE TABLE spark_catalog.shop.products (
id INT,
product STRING,
count INT,
contains_nuts STRING,
price STRING
);
-- preparing and inserting new data
INSERT INTO spark_catalog.shop.products VALUES (1, "butter", 0, "no", "2.40")
INSERT INTO spark_catalog.shop.products VALUES (2, "bread", 0, "yes", "1.20")
INSERT INTO spark_catalog.shop.products VALUES (3, "chorizo", 0, "no", "3.60")
-- check the new table
SELECT * FROM spark_catalog.shop.products
The output should look like this:
+---+-------+-----+-------------+-----+
| id|product|count|contains_nuts|price|
+---+-------+-----+-------------+-----+
| 1| butter| 0| no| 2.40|
| 2| bread| 0| yes| 1.20|
| 3|chorizo| 0| no| 3.60|
+---+-------+-----+-------------+-----+
At first glance, this approach looks like a straightforward way to handle schema changes. You avoid having to worry about migrating individual data types or columns, and the process might even feel clean because you’re dealing with a fresh slate.
However, this naive approach quickly reveals its limitations when your system grows more complex and the data scales. Reingesting all the data every time is extremely time-intensive and expensive. For small tables that hold few hundred records, like metadata or configs it might seem like not a big deal. But for anything at scale, the time to restore everything can grow exponentially.
Manual changes invites also human errors to the party. Been there many times, and learned my lessons, trust me! You might forget to set up indexes, set wrong data type, forget to do a step, you might ingest the wrong data or forget to make backups. Versions are so similar and too you may mix up schemas manually. Reserve extra time for debugging and remember that manual changes require downtime. As an engineer, do you really want to put yourself under such stress from business or be in this position? As data systems become more sophisticated and complex, the limitations of naive approach becomes more apparent. If just there would be a solution that is time tested, reliable, and can handle schema migrations.
2. Traditional Solutions
Luckily, there are tools for Java like Flyway, Liquibase, PHP’s Doctrine, JS’s Sequelize or even Python’s Alembic that can help with schema migrations. These tools provide more structured and automated way to evolve database schemas over time. At their core, they track schema changes with version controlled migration scripts. Instead of writing SQL queries by hand, you can usually define migrations at a higher level, programmatically with the language you use and by using ORM (Object-Relational Mapping) to map objects to tables. These tools are database-agnostic, so once you learn them, you can use them with any relational database.
However, they limitations when using with analytical databases. Mainly, they lack support for Dataframes and Data Lakes. They don’t work natively with dataframes (like Apache Spark or Polars) and distributed file systems and data lakes (like Delta Lake, AWS S3, or even just simple parquet files). I really tried to use them with large data platforms, but with no success. Let me know if you had some success with them.
3. Delta Lake Solution
Simple changes with overwrite first
Delta Lake is a robust solution for handling schema migrations, specifically tailored to modern data engineering needs,
including dataframes and data lakes. It has built-in features like schema enforcement and evolution (or schema merging).
Let me show you how we could add count
column to our original schema using Delta Lake:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, DateType
from pyspark.sql.functions import lit
# read the table
table_name = "spark_catalog.shop.products"
df = spark.table(table_name)
# add new column
df = df.withColumn("count", lit(0))
# write the table
df.write
.format("delta")
.mode("overwrite")
.saveAsTable(table_name)
We have loaded the entire table into a dataframe, added a new column count
with default value 0, and then saved the
dataframe back to the same table. Further, we have specified the mode("overwrite")
to replace the existing table
with the new one. You might say that this is similar to the naive approach, that we just deleted original table and
rewrote it with new schema.
The output should look like this:
+---+-------+-------------+-----+-----+
| id|product|contains_nuts|price|count|
+---+-------+-------------+-----+-----+
| 1| butter| no| 2.40| 0|
| 2| bread| yes| 1.20| 0|
| 3|chorizo| no| 3.60| 0|
+---+-------+-------------+-----+-----+
Schema Evolution with Delta Lake
We wanted to have the count
column right after the product
column, but was appended at the end of table. Also, we
overwrote the entire table, how can we call this schema evolution? Let’s now use the mergeSchema
option on the
original schema:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, DateType
from pyspark.sql.functions import lit
# still using the same table
table_name = "spark_catalog.shop.products"
# defining new schema with count column
new_schema = StructType([
StructField("id", IntegerType(), True),
StructField("product", StringType(), True),
StructField("count", IntegerType(), True), # adding new column
StructField("contains_nuts", StringType(), True),
StructField("price", StringType(), True)
])
new_data = [
(1, "tomato", 5, "no", "0.60"),
(2, "potato", 10, "no", "0.99"),
]
# create a new dataframe with new schema
new_df = spark.createDataFrame(new_data, new_schema)
# write the new df into our original table with original schema
new_df.write
.format("delta")
.mode("append")
.option("mergeSchema", "true")
.saveAsTable(table_name)
The mergeSchema
option tells Delta Lake to merge the new schema with the existing one. This means that Delta Lake
will automatically add the new column to the existing table, without deleting the old data and needing to re-ingest it.
We also didn’t overwrite the table, and this powerful feature of Delta Lake is called schema evolution.
The output should look like this:
+---+-------+-------------+-----+-----+
| id|product|contains_nuts|price|count|
+---+-------+-------------+-----+-----+
| 1| butter| no| 2.40| null|
| 2| bread| yes| 1.20| null|
| 3|chorizo| no| 3.60| null|
| 4| tomato| no| 0.60| 5|
| 5| potato| no| 0.99| 10|
+---+-------+-------------+-----+-----+
Amazing, right? Delta Lake has made schema migrations so much easier and safer. Delta Lake would be enforcing the old
existing schema by default, and would let our write to fail if we would not use the mergeSchema
option. This is a
huge improvement over the naive approach, which required you to manually drop and recreate the schema. But that’s not
all, Delta Lake has more tricks up its sleeve.
Reordering Columns in Delta Lake
But still, I am not happy with the order of columns, count
should be right after product
. Let’s see how we can
change the order of columns in Delta Lake.
Delta Lake does not support reordering columns out of the box. But there is a workaround by using dataframes that you can use to reorder columns in Delta Lake. The trick is to create a new table with the desired column order, then copy the data from the original table to the new one. Here is how you can reorder columns in Delta Lake:
from pyspark.sql import SparkSession
# still using the same table
table_name = "spark_catalog.shop.products"
# read the original table
df = spark.table(table_name)
# create a new dataframe with new schema
new_df = df.select("id", "product", "count", "contains_nuts", "price") # selecting columns in right order will reorder them
# write the new df into our original table with original schema
new_df.write
.format("delta")
.mode("overwrite")
.saveAsTable(table_name)
The output should look like this:
+---+-------+-----+-------------+-----+
| id|product|count|contains_nuts|price|
+---+-------+-----+-------------+-----+
| 1| butter| null| no| 2.40|
| 2| bread| null| yes| 1.20|
| 3|chorizo| null| no| 3.60|
| 4| tomato| 5| no| 0.60|
| 5| potato| 10| no| 0.99|
+---+-------+-----+-------------+-----+
Nice, the count
is where it should be, we are getting there.
Partitioning in Delta Lake
Partitioning is a powerful feature in Delta Lake that can help you optimize your queries and improve performance.
Let’s say that we want to partition our table by the contains_nuts
column to be able to improve searching of products
base on this column. Here is how we can partition our table in Delta Lake:
from pyspark.sql import SparkSession
spark.table("spark_catalog.shop.products").write
.format("delta")
.partitionBy("contains_nuts")
.option("overwriteSchema", True)
.mode("overwrite")
.saveAsTable("spark_catalog.shop.products")
Current Schema
Ok, so we did couple of changes, let’s see what is the current schema of our table:
spark.sql("DESCRIBE spark_catalog.shop.products").show()
The output will look similar to mine:
+-------------------------------+-----------------------------------------------------+-------+
| col_name | data_type | comment|
+-------------------------------+-----------------------------------------------------+-------+
| id | int | |
| product | string | |
| count | string | |
| contains_nuts | string | |
| price | string | |
| # Partition Information | | |
| contains_nuts | string | |
| | | |
| # Detailed Table Information | | |
| Catalog | spark_catalog | |
| Database | shop | |
| Table | products | |
| Created Time | Fri Oct 19 19:54:02 UTC 2024 | |
| Last Access | UNKNOWN | |
| Created By | Spark | |
| Type | MANAGED | |
| Location | s3a://bucket_name/shop/delta/ | |
| Provider | delta | |
| Owner | mr.kecskes@gmail.com | |
| Is_managed_location | true | |
| Table Properties | [delta.minReaderVersion=1,delta.minWriterVersion=2] | |
+-------------------------------+-----------------------------------------------------+-------+
As you can see, there is a lot of interesting metadata about the table, including the schema, partition information, and detailed table information. This can be useful for understanding the structure of your Delta Lake tables and managing them effectively.
Versions in Delta Lake
Same as other traditional solutions, Delta Lake also supports versioning of tables. Every time we change the schema or
add a partition, a new version is created. But the new version is also created when we write data to a Delta Lake table,
which is fundamental difference to versions compared in traditional solutions. This versioning allows us to track all
changes to our schema or data over time and revert to previous version. They even call it with fancy time travel
in Delta Lake). Time travel, because you can easily request previous versions of your data, which could be very useful
as backup for recoveries. If you are sure that you don’t need the previous versions, you can clean them up with
VACUUM
command, which will basically compact all parquet files of previous versions into one latest version. Now,
let’s see how we can see the versions of our table:
spark.sql("DESCRIBE HISTORY spark_catalog.shop.products").show()
The output should look something like this:
+---------+-------------------------------+-------------------+----------------------+-----------------------------------+-----------------------------------------------------------------------------------------------------------------------+------+-----------------------------------+----------------------+--------------+-------------------+---------------+--------------------------------------------------------------+---------------+--------------------------------------------+
| version | timestamp | userId | userName | operation | operationParameters | job | notebook | clusterId | readVersion | isolationLeve l | isBlindAppend | operationMetrics | userMetadata | engineInfo |
+---------+-------------------------------+-------------------+----------------------+-----------------------------------+-----------------------------------------------------------------------------------------------------------------------+------+-----------------------------------+----------------------+--------------+-------------------+---------------+--------------------------------------------------------------+---------------+--------------------------------------------+
| 3 | 2024-10-19T10:58:24.000+00:00 | 1234567890123456 | mr.kecskes@gmail.com | CREATE OR REPLACE TABLE AS SELECT | {"partitionBy":"[\"contains_nuts\"]","description":null,"isManaged":"true","properties":"{}","statsOnLoad":"false"} | null | {"notebookId":"2060321112690202"} | 0123-095432-a5hmaz2y | 2 | WriteSerializable | false | {"numFiles":"1","numOutputRows":"5","numOutputBytes":"880"} | null | Databricks-Runtime/14.3.x-photon-scala2.12 |
| 2 | 2024-10-19T10:56:01.000+00:00 | 1234567890123456 | mr.kecskes@gmail.com | CREATE OR REPLACE TABLE AS SELECT | {"partitionBy":"[]","description":null,"isManaged":"true","properties":"{}","statsOnLoad":"false"} | null | {"notebookId":"2060321112690202"} | 0123-095432-a5hmaz2y | 1 | WriteSerializable | false | {"numFiles":"1","numOutputRows":"5","numOutputBytes":"1112"} | null | Databricks-Runtime/14.3.x-photon-scala2.12 |
| 1 | 2024-10-19T10:53:21.000+00:00 | 1234567890123456 | mr.kecskes@gmail.com | WRITE | {"mode":"Append","statsOnLoad":"false","partitionBy":"[]"} | null | {"notebookId":"2060321112690202"} | 0123-095432-a5hmaz2y | 0 | WriteSerializable | true | {"numFiles":"1","numOutputRows":"3","numOutputBytes":"986"} | null | Databricks-Runtime/14.3.x-photon-scala2.12 |
| 0 | 2024-10-19T10:51:02.000+00:00 | 1234567890123456 | mr.kecskes@gmail.com | CREATE OR REPLACE TABLE AS SELECT | {"partitionBy":"[]","description":null,"isManaged":"true","properties":"{}","statsOnLoad":"false"} | null | {"notebookId":"2060321112690202"} | 0123-095432-a5hmaz2y | null | WriteSerializable | false | {"numFiles":"1","numOutputRows":"3","numOutputBytes":"824"} | null | Databricks-Runtime/14.3.x-photon-scala2.12 |
We can see that there are 4 versions of our table. The most interesting to me are the operation
and
operationParameters
, which tells us that the first version was created when we first wrote data to the table,
the second version was created when we changed the schema by adding count
, and the third version was created when we
added partitioning. Wait, but they said that these versions metadata will be interesting. The effect of overwriting
schema, either with overwrite
or appending with mergeSchema
is that the schema version is not really showing us
the full picture of how schema was changed. We can see that the operation
is CREATE OR REPLACE TABLE AS SELECT
,
which to me still looks like glorified naive approach. Luckily, all this plumbing is given for free by Delta Lake,
so why don’t use it, right?
4. Schema migrations own way
Even by using versioning with schema evolution in Delta Lake, we were still not able to “alter” the schema. We were still dropping and recreating the table, which is not ideal. Ideally we would like to be able to have a master prototype of the schema, which could be together with codebase so that we could apply these changes with each code deploy. Then apply these changes to whatever environment so that it is always aligned with the master schema. I am not saying that is what you want, but it is what I want to entertain you with.
Detect the schemas
Let’s say that we have a master schema that we want to apply to our table. We can compare the master schema with the existing schema of the table and see what are the differences. Here is how we can do that in Delta Lake:
master_schema = StructType([
StructField("id", IntegerType(), True),
StructField("row", StringType(), True),
StructField("shelf", StringType(), True),
StructField("product", StringType(), True),
StructField("count", IntegerType(), True),
StructField("price", StringType(), True)
StructField("expiry_date", DateType(), True)
])
print(master_schema)
existing_schema = spark.table("spark_catalog.shop.products").schema
print(existing_schema)
As you can see below, we are firstly outputting the newly defined schema, called master_schema. We are also secondly outputting the schema that is currently in the table. The output should look like this:
StructType(List(
StructField(id,IntegerType,true),
StructField(row,StringType,true),
StructField(shelf,StringType,true),
StructField(product,StringType,true),
StructField(count,IntegerType,true),
StructField(price,StringType,true),
StructField(expiry_date,DateType,true)
)
)
StructType(List(
StructField(id,IntegerType,true),
StructField(product,StringType,true),
StructField(contains_nuts,StringType,true),
StructField(price,StringType,true)
)
)
Extract the differences
Now that we have the master schema and the existing schema, we can extract the differences between them. Here is how:
# extract fields from schemas
master_fields = set((field.name, field.dataType) for field in master_schema.fields)
existing_fields = set((field.namem field.dataType) for field in existing_schema.fields)
# differences
fields_to_add = master_fields - existing_fields
fields_to_remove = existing_fields - master_fields
print("Fields to add:", fields_to_add)
print("Fields to remove:", fields_to_remove)
The output should look like this:
Fields to add: {('row', StringType), ('shelf', StringType), ('expiry_date', DateType)}
Fields to remove: {('contains_nuts', StringType)}
Apply the changes
With bit of work we could extend the above into something that would allow us to apply the changes to the table, like adding new columns or removing old columns. Let me show you how, with the following code:
# extract fields and data types
existing_schema_fields = [(i, f.name, f.dataType) for i, f in enumerate(existing_schema.fields)]
master_schema_fields = [(i, f.name, f.dataType) for i, f in enumerate(new_schema.fields)]
# convert pyspark data types to SQL types
def spark_type_to_sql_type(spark_type):
if isinstance(spark_type, IntegerType):
return "INT"
elif isinstance(spark_type, StringType):
return "STRING"
else:
return str(spark_type)
fields_to_add = [(i, f_name, f_type) for (i, f_name, f_type) in master_schema_fields if f_name not in [f.name for f in existing_schema.fields]]
fields_to_remove = [(i, f_name, f_type) for (i, f_name, f_type) in existing_schema_fields if f_name not in [f.name for f in master_schema_fields]]
alter_table_queries = []
for (i, f_name, f_type) in fields_to_add:
sql_type = spark_type_to_sql_type(f_type)
previous_field_in_existing = existing_schema_fields[i-1][1] if i-1 >= 0 and i-1 < len(existing_schema_fields) else None
if previous_field_in_existing:
query = f"ALTER TABLE {table_name} ADD COLUMN {f_name} {f_type} AFTER {previous_field_in_existing};"
else:
query = f"ALTER TABLE {table_name} ADD COLUMN {f_name} {f_type} FIRST;"
alter_table_queries.append(query)
for (i, f_name, f_type) in fields_to_remove:
query = f"ALTER TABLE {table_name} DROP COLUMN {f_name};"
alter_table_queries.append(query)
for query in alter_table_queries:
print(query)
spark.sql(query)
The output should look like this:
ALTER TABLE spark_catalog.shop.products ADD COLUMN row STRING AFTER id;
ALTER TABLE spark_catalog.shop.products ADD COLUMN shelf STRING AFTER row;
ALTER TABLE spark_catalog.shop.products ADD COLUMN expiry_date DATE AFTER price;
ALTER TABLE spark_catalog.shop.products DROP COLUMN contains_nuts;
We see that the schema was changed by adding new columns and removing old columns. This is not a fully automated solution, but it is a good starting point for building a more sophisticated schema migration tool. You could extend this to handle more complex schema changes, such as changing data types, reordering columns or updating partitions.
I was not setting any default values for new columns, but I believe you could easily extend the above code to handle
that as well with ALTER TABLE ... SET DEFAULT ...
command, where default value could be a constant or a function.
Conclusion
As you can see the schema migration is not a simple task, but it is crucial for data management. You can go with naive approach for certain cases, but I would not recommend it for large scale systems. Traditional solutions like Flyway or Liquibase are great for relational databases, but they lack support for dataframes and data lakes. Delta Lake is a modern data management solution that is specifically designed to handle schema migrations for dataframes and data lakes. It has built-in features like schema evolution and enforcement, which make it easy to evolve your schema over time without losing data or incurring downtime. Delta Lake also supports versioning of tables, which allows you to track changes to your data over time and revert to previous versions. However, Delta Lake schema evolution is not a silver bullet and has its own limitations. You might still need to implement your own solutions for more complex schema changes, such as changing data types, reordering columns, and adding partitions. But Delta Lake is a great starting point for handling schema migrations in modern data engineering.
I hope that this blogpost has given you a good overview of schema migrations and how you can handle them with Delta Lake. If you have any questions or feedback, please feel free to leave a comment. I would love to hear from you and help you with any issues you might be facing. Thank you for reading and happy data engineering!