Simplify Data Observability with OpenLineage

WL
AUTHOR
Willy Lulciuc
Published on:

To demonstrate how simple Data Observability with OpenLineage can be, we'll use an existing example iceberg table demo.nyc.taxis used in the iceberg quickstart guide for spark. Our team is based in SF, so naturally, we'll be using demo.sf.waymo.

Below, Marquez will be used to visualize the data lineage graph. We also recommend following our spark tutorial for oleander.

Create spark-with-iceberg.py

Fist, let's define a simple py script that will run commands on our iceberg table in spark. Make sure you mount spark-with-iceberg.py as a volume onto the spark-iceberg container.

from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, FloatType, LongType, StructType, StructField, StringType

sc = SparkContext('local')
spark = SparkSession(sc)

# (1) Creating 'demo' iceberg table
schema = StructType([
  StructField("vendor_id", LongType(), True),
  StructField("trip_id", LongType(), True),
  StructField("trip_distance", FloatType(), True),
  StructField("fare_amount", DoubleType(), True),
  StructField("store_and_fwd_flag", StringType(), True)
])

df = spark.createDataFrame([], schema)
df.writeTo("demo.sf.waymo").create()

# (2) Write to 'dataforai' iceberg table
schema = spark.table("demo.sf.waymo").schema
data = [
    (1, 1000371, 1.8, 15.32, "N"),
    (2, 1000372, 2.5, 22.15, "N"),
    (2, 1000373, 0.9, 9.01, "N"),
    (1, 1000374, 8.4, 42.13, "Y")
  ]
df = spark.createDataFrame(data, schema)
df.writeTo("demo.sf.waymo").append()

# (3) Read from 'demo' iceberg table
df = spark.table("demo.sf.waymo").show()

Docker-Compose

Next, we need to modify marquez/docker-compose.yml to use the tabulario/spark-iceberg as outlined here in the quickstart guide for iceberg. Then, run:

$ ./docker/up.sh --build \
  --api-port 9003 \
  --api-admin-port 9004 \
  --web-port 3001

Spark with Iceberg

Finally, run the spark-submit command below to submit our py script using spark.jars.packages to install openlineage-spark lib. and spark.extraListeners that will emit OpenLineage events to the defined transport:

docker exec -it spark-iceberg /opt/spark/bin/spark-submit \
  --conf spark.jars.packages=io.openlineage:openlineage-spark_2.12:1.23.0 \
  --conf spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener \
  --conf spark.openlineage.transport.type=http \
  --conf spark.openlineage.transport.url=http://api:9003 \
  --conf spark.openlineage.namespace=spark-with-iceberg \
  spark-with-iceberg.py

Putting it All Together

After running the spark-submit command, you will see the following lineage graph in Marquez:

image

with, sf/waymo as the iceberg dataset. You can also see the OpenLineage facets for the dataset:

{
  "schema": {
    "fields": [
      { "name": "vendor_id", "type": "long" },
      { "name": "trip_id", "type": "long" },
      { "name": "trip_distance", "type": "float" },
      { "name": "fare_amount", "type": "double" },
      { "name": "store_and_fwd_flag", "type": "string" }
    ],
    "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark",
    "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet"
  },
  "dataSource": {
    "uri": "s3://warehouse",
    "name": "s3://warehouse",
    "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark",
    "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet"
  },
  "symlinks": {
    "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark",
    "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet",
    "identifiers": [
      {
        "name": "sf.waymo",
        "type": "TABLE",
        "namespace": "http://rest:8181"
      }
    ]
  },
  "storage": {
    "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark",
    "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/StorageDatasetFacet.json#/$defs/StorageDatasetFacet",
    "fileFormat": "parquet",
    "storageLayer": "iceberg"
  },
  "version": {
    "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark",
    "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasetVersionDatasetFacet.json#/$defs/DatasetVersionDatasetFacet",
    "datasetVersion": "8889658338673680701"
  }
}