Simplify Data Observability with OpenLineage
To demonstrate how simple Data Observability with OpenLineage can be, we'll use an existing example iceberg
table demo.nyc.taxis
used in the iceberg quickstart guide for spark. Our team is based in SF, so naturally, we'll
be using demo.sf.waymo
.
Below, Marquez will be used to visualize the data lineage graph. We also recommend following our spark tutorial for oleander.
Create spark-with-iceberg.py
Fist, let's define a simple py
script that will run commands on our iceberg table in spark.
Make sure you mount spark-with-iceberg.py
as a volume onto the spark-iceberg
container.
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, FloatType, LongType, StructType, StructField, StringType
sc = SparkContext('local')
spark = SparkSession(sc)
# (1) Creating 'demo' iceberg table
schema = StructType([
StructField("vendor_id", LongType(), True),
StructField("trip_id", LongType(), True),
StructField("trip_distance", FloatType(), True),
StructField("fare_amount", DoubleType(), True),
StructField("store_and_fwd_flag", StringType(), True)
])
df = spark.createDataFrame([], schema)
df.writeTo("demo.sf.waymo").create()
# (2) Write to 'dataforai' iceberg table
schema = spark.table("demo.sf.waymo").schema
data = [
(1, 1000371, 1.8, 15.32, "N"),
(2, 1000372, 2.5, 22.15, "N"),
(2, 1000373, 0.9, 9.01, "N"),
(1, 1000374, 8.4, 42.13, "Y")
]
df = spark.createDataFrame(data, schema)
df.writeTo("demo.sf.waymo").append()
# (3) Read from 'demo' iceberg table
df = spark.table("demo.sf.waymo").show()
Docker-Compose
Next, we need to modify marquez/docker-compose.yml to use the tabulario/spark-iceberg as outlined here in the quickstart guide for iceberg. Then, run:
$ ./docker/up.sh --build \
--api-port 9003 \
--api-admin-port 9004 \
--web-port 3001
Spark with Iceberg
Finally, run the spark-submit
command below to submit our py
script using spark.jars.packages
to install openlineage-spark
lib.
and spark.extraListeners
that will emit OpenLineage events to the defined transport:
docker exec -it spark-iceberg /opt/spark/bin/spark-submit \
--conf spark.jars.packages=io.openlineage:openlineage-spark_2.12:1.23.0 \
--conf spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener \
--conf spark.openlineage.transport.type=http \
--conf spark.openlineage.transport.url=http://api:9003 \
--conf spark.openlineage.namespace=spark-with-iceberg \
spark-with-iceberg.py
Putting it All Together
After running the spark-submit
command, you will see the following lineage graph in Marquez:
with, sf/waymo
as the iceberg dataset. You can also see the OpenLineage facets for the dataset:
{
"schema": {
"fields": [
{ "name": "vendor_id", "type": "long" },
{ "name": "trip_id", "type": "long" },
{ "name": "trip_distance", "type": "float" },
{ "name": "fare_amount", "type": "double" },
{ "name": "store_and_fwd_flag", "type": "string" }
],
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet"
},
"dataSource": {
"uri": "s3://warehouse",
"name": "s3://warehouse",
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet"
},
"symlinks": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet",
"identifiers": [
{
"name": "sf.waymo",
"type": "TABLE",
"namespace": "http://rest:8181"
}
]
},
"storage": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/StorageDatasetFacet.json#/$defs/StorageDatasetFacet",
"fileFormat": "parquet",
"storageLayer": "iceberg"
},
"version": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasetVersionDatasetFacet.json#/$defs/DatasetVersionDatasetFacet",
"datasetVersion": "8889658338673680701"
}
}