spalah
Spalah is a set of python helpers to deal with PySpark dataframes, transformations, schemas etc.
It's main feature is to simplify dealing with advanced spark schemas. Think nested structures, arrays, arrays in arrays in nested structures in arrays. Sometimes such schemas happens. Especially if the lakehouses stores ingested json dataset as is.
And.. the word "spalah" means "spark" in Ukrainian πΊπ¦ :)
Documentation: https://avolok.github.io/spalah
Source Code for spalah: https://github.com/avolok/spalah
Installation
Use the package manager pip to install spalah.
Examples
Slicing complex schema by removing (or nullifying) nested elements
from spalah.dataframe import slice_dataframe
df = spark.sql(
'SELECT 1 as ID, "John" AS Name, struct("line1" AS Line1, "line2" AS Line2) AS Address'
)
df.printSchema()
""" output:
root
|-- ID: integer (nullable = false)
|-- Name: string (nullable = false)
|-- Address: struct (nullable = false)
| |-- Line1: string (nullable = false)
| |-- Line2: string (nullable = false)
"""
# Create a new dataframe by cutting of root and nested attributes
df_result = slice_dataframe(
input_dataframe=df,
columns_to_include=["Name", "Address"],
columns_to_exclude=["Address.Line2"]
)
df_result.printSchema()
""" output:
root
|-- Name: string (nullable = false)
|-- Address: struct (nullable = false)
| |-- Line1: string (nullable = false)
"""
Note
Beside of nested regular structs it also supported slicing of structs in arrays, including multiple levels of nesting
Get list of flattened elements from the complex schema
from spalah.dataframe import flatten_schema
# Pass the sample dataframe to get the list of all attributes as single dimension list
flatten_schema(df_complex_schema.schema)
""" output:
['ID', 'Name', 'Address.Line1', 'Address.Line2']
"""
from spalah.dataframe import flatten_schema
# Alternatively, the function can return data types of the attributes
flatten_schema(
schema=df_complex_schema.schema,
include_datatype=True
)
""" output:
[
('ID', 'IntegerType'),
('Name', 'StringType'),
('Address.Line1', 'StringType'),
('Address.Line2', 'StringType')
]
"""
Set Delta Table properties and check constraints
from spalah.dataset import DeltaTableConfig
dp = DeltaTableConfig(table_path="/tmp/nested_schema_dataset")
dp.properties = {
"delta.logRetentionDuration": "interval 10 days",
"delta.deletedFileRetentionDuration": "interval 15 days"
}
dp.check_constraints = {'id_is_not_null': 'id is not null'}
See more examples in examples: dataframes and examples: dataset
License
This project is licensed under the terms of the MIT license.