Skip to content

spalah.dataset.DeltaTableConfig

spalah.dataset.DeltaTableConfig.DeltaTableConfig(table_path='', table_name='', spark_session=None)

Manages Delta Table properties, constraints, etc.

Attributes:

Name Type Description
keep_existing_properties bool

Preserves existing table properties if they are not in the input value. Defaults to False

keep_existing_check_constraints bool

Preserves existing table constraints if they are not in the input value. Defaults to False

Parameters:

Name Type Description Default
table_path str

Path to delta table. For instance: /mnt/db1/table1

''
table_name str

Delta table name. For instance: db1.table1

''
spark_session SparkSession | None

(SparkSession, optional) The current spark context.

None

Raises: ValueError: if values for both 'table_path' and 'table_name' provided provide values to one of them ValueError: if values for neither 'table_path' nor 'table_name' provided provide values to one of them Examples: >>> from spalah.datalake import DeltaTableConfig >>> dp = DeltaTableConfig(table_path="/path/dataset") >>> print(dp.properties)

Source code in spalah/dataset/DeltaTableConfig.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(
    self,
    table_path: str = "",
    table_name: str = "",
    spark_session: SparkSession | None = None,
) -> None:
    """
    Args:
        table_path (str, optional): Path to delta table. For instance: /mnt/db1/table1
        table_name (str, optional): Delta table name. For instance: db1.table1
        spark_session: (SparkSession, optional)  The current spark context.
    Raises:
        ValueError: if values for both 'table_path' and 'table_name' provided
                    provide values to one of them
        ValueError: if values for neither 'table_path' nor 'table_name' provided
                    provide values to one of them
    Examples:
        >>> from spalah.datalake import DeltaTableConfig
        >>> dp = DeltaTableConfig(table_path="/path/dataset")
        >>> print(dp.properties)
        {'delta.deletedFileRetentionDuration': 'interval 15 days'}
    """

    if spark_session is None:
        spark = SparkSession.getActiveSession()
        if spark is None:
            raise ValueError(
                "No active Spark session found. Please provide a valid SparkSession."
            )
        else:
            self.spark_session: SparkSession = spark
    else:
        self.spark_session: SparkSession = spark_session

    self.table_name = self.__get_table_identifier(table_path=table_path, table_name=table_name)
    self.original_table_name = table_name

properties property writable

Gets/sets dataset's delta table properties.

Parameters:

Name Type Description Default
value Dict

An input dictionary in the format: {"property_name": "value"}

required

Examples:

>>> from spalah.datalake import DeltaTableConfig
>>> dp = DeltaTableConfig(table_path="/path/dataset")
>>>
>>> # get existing properties
>>> print(dp.properties)
{'delta.deletedFileRetentionDuration': 'interval 15 days'}
>>>
>>> # Adjust the property value from 15 to 30 days
>>> dp.properties = {'delta.deletedFileRetentionDuration': 'interval 30 days'}

check_constraints property writable

Gets/sets dataset's delta table check constraints.

Parameters:

Name Type Description Default
value Dict

An input dictionary in the format: {"property_name": "value"}

required

Examples: >>> from spalah.datalake import DeltaTableConfig >>> dp = DeltaTableConfig(table_path="/path/dataset") >>> >>> # get existing constraints >>> print(dp.check_constraints) {} >>> >>> # Add a new check constraint >>> dp.check_constraints = {'id_is_not_null': 'id is not null'}

columns property

Gets dataset's delta table columns and their data types.

Examples:

>>> from spalah.datalake import DeltaTableConfig
>>> dp = DeltaTableConfig(table_path="/path/dataset")
>>>
>>> # get existing columns
>>> print(dp.columns)
{"id": "int", "name": "string", "age": "int"}

clustering_columns property

Gets dataset's delta table clustering columns.

Examples:

>>> from spalah.datalake import DeltaTableConfig
>>> dp = DeltaTableConfig(table_path="/path/dataset")
>>>
>>> # get existing clustering columns
>>> print(dp.clustering_columns)
["column1", "column2"]

partition_columns property

Gets dataset's delta table partition columns.

Examples:

>>> from spalah.datalake import DeltaTableConfig
>>> dp = DeltaTableConfig(table_path="/path/dataset")
>>>
>>> # get existing partition columns
>>> print(dp.partition_columns)
["column1", "column2"]

details property

Gets dataset's delta table details including columns, properties, constraints, clustering columns and partition columns.

Examples:

>>> from spalah.datalake import DeltaTableConfig
>>> dp = DeltaTableConfig(table_path="/path/dataset")
>>>
>>> # get existing table details
>>> print(dp.details)
{
    "columns": {"id": "int", "name": "string", "age": "int"},
    "properties": {'delta.deletedFileRetentionDuration': 'interval 15 days'},
    "constraints": {"id_check": "id is not null"},
    "clustering_columns": [],
    "partition_columns": ["id"]
}