Data Flow supports Delta Lake by default when your
Applications run Spark 3.2.1 or later.
Delta Lake lets you
build a Lakehouse architecture on top of data lakes. Delta Lake provides ACID transactions,
scalable metadata handling, and unifies streaming and batch data processing on top of existing
data lakes. Delta Lake 3.1.0 is supported with the Data Flow
Spark 3.5.0 processing engine, Delta Lake 2.0.1 and 1.2.1 are supported with the Data Flow Spark 3.2.1 processing engine.
To use Delta Lake with Data Flow:
The Spark version in Data Flow must be 3.2.1 (or
later).
Follow these steps to load Delta Lake to use with Data Flow.
Use the Spark configuration property, spark.oracle.deltalake.version, to specify which version of Delta Lake to use. Set it to one of the following values:
Spark.oracle.deltalake.version Values
Spark Version
Value of spark.oracle.deltalake.version
Binaries loaded
3.5.0
3.1.0
Delta Lake 3.1.0
3.2.1
2.0.1
Delta Lake 2.0.1
3.2.1
1.2.1
Delta Lake 1.2.1
3.5.0, 3.2.1
none
No Delta Lake binaries are loaded, you must provide them.
Note
If you don't set a value for spark.oracle.deltalake.version, then the Delta Lake 1.2.1 binaries are loaded by default.
If you set spark.oracle.deltalake.version to none, then
you must provide the Delta Lake dependency libraries as part of the application JAR.
More information is available in the Delta Lake public documentation.
For example, to load Delta Lake 3.1.0, package these libraries:
delta-storage-3.1.0.jar
delta-spark_2.12-3.1.0.jar
delta-contribs_2.12-3.1.0.jar
and follow these steps:
For Java or Scala applications, provide the Delta Lake 3.1.0 dependency from the
maven
repository:
Samples of using the Delta Lake API with Data Flow.
The Data Flow Spark engine supports the
delta format by default. Delta Lake APIs are available for Java,
Python, and Scala languages. If you're using Delta Lake Python APIs, use the custom
archive.zip dependency packager, include the delta-spark package, as described in Spark-Submit Functionality in Data Flow.
Usage Samples 🔗
Java or Scala
Copy
spark.read().format("delta").load(<path_to_Delta_table>)
df.write().format("delta").save(<path_to_Delta_table>)
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, <path_to_Delta_table>)
deltaTable.vacuum()