Simplify Data Observability with OpenLineage
WL

Willy Lulciuc

staff
Tags:
tutorialopenlineagesparkicebergmarquezdata-observability

To demonstrate how simple Data Observability with OpenLineage can be, we'll use an existing example iceberg table demo.nyc.taxis used in the iceberg quickstart guide for spark. Our team is based in SF, so naturally, we'll using demo.sf.waymo.

Below, Marquez will be used to visualize the data lineage graph. We also recommend following our spark tutorial for oleander.

Create spark-with-iceberg.py

First, let's define a simple py script that will run commands on our iceberg table in spark. Make sure you mount spark-with-iceberg.py as a volume onto the spark-iceberg container.

from pyspark.context import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import DoubleType, FloatType, LongType, StructType, StructField, StringType sc = SparkContext('local') spark = SparkSession(sc) # (1) Creating 'demo' iceberg table schema = StructType([ StructField("vendor_id", LongType(), True), StructField("trip_id", LongType(), True), StructField("trip_distance", FloatType(), True), StructField("fare_amount", DoubleType(), True), StructField("store_and_fwd_flag", StringType(), True) ]) df = spark.createDataFrame([], schema) df.writeTo("demo.sf.waymo").create() # (2) Write to 'dataforai' iceberg table schema = spark.table("demo.sf.waymo").schema data = [ (1, 1000371, 1.8, 15.32, "N"), (2, 1000372, 2.5, 22.15, "N"), (2, 1000373, 0.9, 9.01, "N"), (1, 1000374, 8.4, 42.13, "Y") ] df = spark.createDataFrame(data, schema) df.writeTo("demo.sf.waymo").append() # (3) Read from 'demo' iceberg table df = spark.table("demo.sf.waymo").show()

Docker-Compose

Next, we need to modify marquez/docker-compose.yml to use the tabulario/spark-iceberg as outlined here in the quickstart guide for iceberg. Then, run:

$ ./docker/up.sh --build \ --api-port 9003 \ --api-admin-port 9004 \ --web-port 3001

Spark with Iceberg

Finally, run the spark-submit command below to submit our py script using spark.jars.packages to install openlineage-spark lib. and spark.extraListeners that will emit OpenLineage events to the defined transport:

docker exec -it spark-iceberg /opt/spark/bin/spark-submit \ --conf spark.jars.packages=io.openlineage:openlineage-spark_2.12:1.23.0 \ --conf spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener \ --conf spark.openlineage.transport.type=http \ --conf spark.openlineage.transport.url=http://api:9003 \ --conf spark.openlineage.namespace=spark-with-iceberg \ spark-with-iceberg.py

Putting it All Together

After running the spark-submit command, you will see the following lineage graph in Marquez:

Lineage Graph

with, sf/waymo as the iceberg dataset. You can also see the OpenLineage facets for the dataset:

{ "schema": { "fields": [ { "name": "vendor_id", "type": "long" }, { "name": "trip_id", "type": "long" }, { "name": "trip_distance", "type": "float" }, { "name": "fare_amount", "type": "double" }, { "name": "store_and_fwd_flag", "type": "string" } ], "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet" }, "dataSource": { "uri": "s3://warehouse", "name": "s3://warehouse", "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet" }, "symlinks": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet", "identifiers": [ { "name": "sf.waymo", "type": "TABLE", "namespace": "http://rest:8181" } ] }, "storage": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/StorageDatasetFacet.json#/$defs/StorageDatasetFacet", "fileFormat": "parquet", "storageLayer": "iceberg" }, "version": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasetVersionDatasetFacet.json#/$defs/DatasetVersionDatasetFacet", "datasetVersion": "8889658338673680701" } }
Simplify Data Observability with OpenLineage - oleander