Examples of use: spalah.dataframe
This module contains various dataframe specific functions and classes, like SchemaComparer
, script_dataframe
, slice_dataframe
etc.
slice_dataframe
Slice the schema of the dataframe by selecting which attributes must be included and/or excluded. The function supports also complex structures and can be helpful for cases when sensitive/PII attributes must be cut off during the data transformation:
from spalah.dataframe import slice_dataframe
df = spark.sql(
'SELECT 1 as ID, "John" AS Name, struct("line1" AS Line1, "line2" AS Line2) AS Address'
)
df.printSchema()
""" output:
root
|-- ID: integer (nullable = false)
|-- Name: string (nullable = false)
|-- Address: struct (nullable = false)
| |-- Line1: string (nullable = false)
| |-- Line2: string (nullable = false)
"""
# Create a new dataframe by cutting of root and nested attributes
df_result = slice_dataframe(
input_dataframe=df,
columns_to_include=["Name", "Address"],
columns_to_exclude=["Address.Line2"]
)
df_result.printSchema()
""" output:
root
|-- Name: string (nullable = false)
|-- Address: struct (nullable = false)
| |-- Line1: string (nullable = false)
"""
Alternatively, excluded columns can be nullified instead of removed:
df_result = slice_dataframe(
input_dataframe=df,
columns_to_include=["Name", "Address"],
columns_to_exclude=["Address.Line2"],
nullify_only=True
)
df_result.show()
""" output:
+----+----+-------------+
| ID|Name| Address|
+----+----+-------------+
|null|John|{line1, null}|
+----+----+-------------+
"""
Beside of nested regular structs it also supported slicing of such in arrays, including multiple levels of nesting.
Following schema example contains
root
|-- parent_struct: struct (nullable = false)
| |-- struct_in_array: array (nullable = false)
| | |-- element: struct (containsNull = false)
| | | |-- a: integer (nullable = false)
| | | |-- b: integer (nullable = false)
| | | |-- c_array: array (nullable = false)
| | | | |-- element: struct (containsNull = false)
| | | | | |-- cc: string (nullable = false)
| | | | | |-- dd: string (nullable = false)
| | | | | |-- g: array (nullable = false)
| | | | | | |-- element: struct (containsNull = false)
| | | | | | | |-- ee: string (nullable = false)
| | | | | | | |-- ff: integer (nullable = false)
| | | | | |-- h: struct (nullable = false)
| | | | | | |-- zz: string (nullable = false)
| | | | | | |-- yy: integer (nullable = false)
| |-- storeid: integer (nullable = false)
Following snippet will slice the array to contain only the subset of schema, including multiple levels of arrays:
df_out = project_dataframe_schema(
input_dataframe=df_h,
columns_to_include=["parent_struct.struct_in_array.c_array.g"],
nullify_only=False,
debug=False,
)
df_out.printSchema()
root
|-- parent_struct: struct (nullable = false)
| |-- struct_in_array: array (nullable = false)
| | |-- element: struct (containsNull = false)
| | | |-- c_array: array (nullable = false)
| | | | |-- element: struct (containsNull = false)
| | | | | |-- g: array (nullable = false)
| | | | | | |-- element: struct (containsNull = false)
| | | | | | | |-- ee: string (nullable = false)
| | | | | | | |-- ff: integer (nullable = false)
Another example inverts the output by excluding a child array element parent_struct.struct_in_array.c_array
:
df_out = project_dataframe_schema(
input_dataframe=df_h,
columns_to_exclude=["parent_struct.struct_in_array.c_array"],
nullify_only=False,
debug=False,
)
df_out.printSchema()
root
|-- parent_struct: struct (nullable = false)
| |-- struct_in_array: array (nullable = false)
| | |-- element: struct (containsNull = false)
| | | |-- a: integer (nullable = false)
| | | |-- b: integer (nullable = false)
| |-- storeid: integer (nullable = false)
flatten_schema
The flatten_schema
can be helpful when dealing with complex (nested) data types.
To see it in action, let's define a sample dataframe
df_complex_schema = spark.sql(
'SELECT 1 as ID, "John" AS Name, struct("line1" AS Line1, "line2" AS Line2) AS Address'
)
df_source.printSchema()
root
|-- ID: integer (nullable = false)
|-- Name: string (nullable = false)
|-- Address: struct (nullable = false)
| |-- Line1: string (nullable = false)
| |-- Line2: string (nullable = false)
flatten_schema(df_complex_schema.schema)
['ID', 'Name', 'Address.Line1', 'Address.Line2']
flatten_schema(
schema=df_complex_schema.schema,
include_datatype=True
)
[
('ID', 'IntegerType'),
('Name', 'StringType'),
('Address.Line1', 'StringType'),
('Address.Line2', 'StringType')
]
script_dataframe
Pass the dataframe to get the script_dataframe
to get the script of it that can be ported to another environment. The function can generate the script for dataframes with up to 20 rows. Use .limit(20)
to reduce the number of rows when needed
from spalah.dataframe import script_dataframe
script = script_dataframe(df)
print(script)
from pyspark.sql import Row
import datetime
from decimal import Decimal
from pyspark.sql.types import *
# Scripted data and schema:
__data = [Row(ID=1, Name='John', Address=Row(Line1='line1', Line2='line2'))]
__schema = {'type': 'struct', 'fields': [{'name': 'ID', 'type': 'integer', 'nullable': False, 'metadata': {}}, {'name': 'Name', 'type': 'string', 'nullable': False, 'metadata': {}}, {'name': 'Address', 'type': {'type': 'struct', 'fields': [{'name': 'Line1', 'type': 'string', 'nullable': False, 'metadata': {}}, {'name': 'Line2', 'type': 'string', 'nullable': False, 'metadata': {}}]}, 'nullable': False, 'metadata': {}}]}
outcome_dataframe = spark.createDataFrame(__data, StructType.fromJson(__schema))
SchemaComparer
Let's define a source and target dataframes that will be used further in the schema comparison. The target schema contains a few adjustments that the class to catch and display
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# A source dataframe
df_source = spark.sql(
'SELECT 1 as ID, "John" AS Name, struct("line1" AS Line1, "line2" AS Line2) AS Address'
)
df_source.printSchema()
root
|-- ID: integer (nullable = false)
|-- Name: string (nullable = false)
|-- Address: struct (nullable = false)
| |-- Line1: string (nullable = false)
| |-- Line2: string (nullable = false)
# A target dataframe
df_target = spark.sql(
'SELECT "a" as ID, "John" AS name, struct("line1" AS Line1) AS Address'
)
df_target.printSchema()
root
|-- ID: string (nullable = false) # Changed data type
|-- name: string (nullable = false) # Changed case of the column name
|-- Address: struct (nullable = false) # Removed field Line2
| |-- Line1: string (nullable = false)
Then, let's initiate a SchemaComparer and run the comparison
from spalah.dataframe import SchemaComparer
schema_comparer = SchemaComparer(
source_schema = df_source.schema,
target_schema = df_target.schema
)
schema_comparer.compare()
The comparison results are stored in the class instance properties matched
and not_matched
schema_comparer.matched
[MatchedColumn(name='Address.Line1', data_type='StringType')]
schema_comparer.not_matched
[
NotMatchedColumn(
name='name',
data_type='StringType',
reason="The column exists in source and target schemas but it's name is case-mismatched"
),
NotMatchedColumn(
name='ID',
data_type='IntegerType <=> StringType',
reason='The column exists in source and target schemas but it is not matched by a data type'
),
NotMatchedColumn(
name='Address.Line2',
data_type='StringType',
reason='The column exists only in the source schema'
)
]