Skip to content

spalah

Spalah is a set of python helpers to deal with PySpark dataframes, transformations, schemas etc.

It's main feature is to simplify dealing with advanced spark schemas. Think nested structures, arrays, arrays in arrays in nested structures in arrays. Sometimes such schemas happens. Especially if the lakehouses stores ingested json dataset as is.

And.. the word "spalah" means "spark" in Ukrainian πŸ‡ΊπŸ‡¦ :)

Test Package version License


Documentation: https://avolok.github.io/spalah
Source Code for spalah: https://github.com/avolok/spalah


Installation

Use the package manager pip to install spalah.

pip install spalah Installed

Examples

Slicing complex schema by removing (or nullifying) nested elements

from spalah.dataframe import slice_dataframe

df = spark.sql(
    'SELECT 1 as ID, "John" AS Name, struct("line1" AS Line1, "line2" AS Line2) AS Address'
)
df.printSchema()

""" output:
root
 |-- ID: integer (nullable = false)
 |-- Name: string (nullable = false)
 |-- Address: struct (nullable = false)
 |    |-- Line1: string (nullable = false)
 |    |-- Line2: string (nullable = false)
"""

# Create a new dataframe by cutting of root and nested attributes
df_result = slice_dataframe(
    input_dataframe=df,
    columns_to_include=["Name", "Address"],
    columns_to_exclude=["Address.Line2"]
)
df_result.printSchema()

""" output:
root
 |-- Name: string (nullable = false)
 |-- Address: struct (nullable = false)
 |    |-- Line1: string (nullable = false)
"""

Note

Beside of nested regular structs it also supported slicing of structs in arrays, including multiple levels of nesting

Get list of flattened elements from the complex schema

from spalah.dataframe import flatten_schema

# Pass the sample dataframe to get the list of all attributes as single dimension list
flatten_schema(df_complex_schema.schema)

""" output:
['ID', 'Name', 'Address.Line1', 'Address.Line2']
"""
from spalah.dataframe import flatten_schema

# Alternatively, the function can return data types of the attributes
flatten_schema(
    schema=df_complex_schema.schema,
    include_datatype=True
)

""" output:
[
    ('ID', 'IntegerType'),
    ('Name', 'StringType'),
    ('Address.Line1', 'StringType'),
    ('Address.Line2', 'StringType')
]
"""

Set Delta Table properties and check constraints

from spalah.dataset import DeltaTableConfig

dp = DeltaTableConfig(table_path="/tmp/nested_schema_dataset")

dp.properties = {
    "delta.logRetentionDuration": "interval 10 days",
    "delta.deletedFileRetentionDuration": "interval 15 days"
}
dp.check_constraints = {'id_is_not_null': 'id is not null'} 

See more examples in examples: dataframes and examples: dataset

License

This project is licensed under the terms of the MIT license.