Here is the article’s outline:
- Bird eyes’ view of approaches to obtain the lineage information
- Solution with Spark-3 plans
- Notes on coding
How to get data lineage
A data lineage can be visualized as a graph describing the origins and descedants of data. It shows where a dataset comes from and which downstream data it produces. It as such serves as a vehicle of data governance.
There are at least two ways to collect lineage information: by templates and by Spark listeners. Data platforms might require users to provide inputs and an output at the beginning of a code window, i.e. a template, before writing the data-processing code there. The in and out data paths are then centrally captured in database. Any changes in the lineage can be easily updated in the database and shown in the UI. The template can allow for both Spark and non-Spark jobs, which means for instance Pandas codes can be accounted for. Also important is the ability to present the code together with the lineage in the UI as they can be saved together.
Using Spark listeners is likely to be less flexible. In this case we can create a package called Spark agent and include it as a listening thread keeping track of events during the execution of the Spark job. The listener runs in the Spark driver. The events contain data of Spark plans — a vital ingredient for lineage. The listener API-emits collected lineage data to a storage. Clearly this only works with Spark-supported APIs in certain languages such as Scala, Python and Java, but still cannot cover Pandas. We at the same time have to tackle the challenges of network connection and lineage update via the API. It should be more difficult to gather the lineage data and its corresponding processing code to pair their appearance in the UI as the former is obtained via API seperately from the latter.
Now let’s discuss the employment of the Spark listener.
Spark job event listener
To create the listener, simply extend the QueryExecutionListener interface and override its methods.
class LineageListener extends QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { // implementation } override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { // implementation }
QueryExecution has the minimum information to extract the lineage. However, if you wish to have receive more metadata and make addtional preparation and cleaning at the start and end of the query execution, SparkListener can satisfy the demand.
public class LineageListener extends SparkListener { @Override public void onApplicationStart(SparkListenerApplicationStart applicationStart) { // implementation } @Override public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { // implementation } @Override public void onOtherEvent(SparkListenerEvent event) { // implementation } }
Note that we’ve utilized the class SparkListenerEvent to arrive at the lineage detail. But here we have to make a detour, going from SparkListenerEvent to SparkListenerSQLExecutionStart/SparkListenerSQLExecutionEnd, and finally to QueryExecution, although you can skip the middle step if you don’t need a finer-grain control on the stage of the query execution.
QueryExecution is the key player since it provides access to the interface/trait LogicalPlan, which exposes the input-output lineage information. As a knowledge refresher, this class represents logical operations needed to evaluate the query, and is used by Spark SQL to generate the Spark physical plan, aka the object SparkPlan — which lays out physical RDD operations that can be executed on a cluster.
LogicalPlan is extended by multiple subclasses and can be represented differently by them at runtime, depending on the stage in which the logical plan is built, the operations on the data and types of Spark action.
These operations can be categorized as: dataset operations, RDD transformations, SQL dialect, and DLL operations.
As for Spark actions, we may concern ourselves with only persistent ones, e.g. write, and ignore in-memory actions such as collect() and show().
The root class LogicalPlan has three important immediate children: Command, LeafNode and LogicalLeafNode. Below are the classes directly relevant to the lineage information and their hierarchical relation with LogicalPlan. The list isn’t exhaustive.
- InsertIntoHadoopFsRelationCommand < DataWritingCommand < Command < LogicalPlan
- SaveIntoDataSourceCommand < LeafRunnableCommand < RunnableCommand < Command < LogicalPlan
- CreateDataSourceTableAsSelectCommand < DataWritingCommand < UnaryCommand < Command < LogicalPlan
- CreateHiveTableAsSelectCommand < CreateHiveTableAsSelectBase < DataWritingCommand < UnaryCommand < Command < LogicalPlan
- InsertIntoHiveTable < SaveAsHiveFile < DataWritingCommand < UnaryCommand < Command < LogicalPlan
- LogicalRelation < LeafNode < LogicalPlan
- InMemoryRelation < LeafNode < LogicalPlan
- HiveTableRelation < LeafNode < LogicalPlan
- InMemoryRelation < LeafNode < LogicalPlan
If LogicalPlan‘s manifestation is LogicalRelation, we need to use its method of relation() to get an object of type BaseRelation. At runtime, BaseRelation may become the following children:
- HadoopFsRelation
- JDBCRelation
BaseRelation is an interface for interacting with various types of data sources.
Similary, we may face cases where Spark physical plans are involved as an intermediate. For instance, InMemoryRelation provides access to specific runtime implementations of SparkPlan, from which runtime implementations of the interface BaseRelation are extracted. The latter implementations in turn give runtime children of LogicalPlan. The reason we have to do this is that SparkPlan provides more desired LogicalPlan subclasses as the Spark plan can have multiple inputs and outputs.
Here are a few frequently seen implementations of SparkPlan.
- FileSourceScanExec < DataSourceScanExec < LeafExecNode < SparkPlan
- RowDataSourceScanExec < DataSourceScanExec < LeafExecNode < SparkPlan
- HiveTableScanExec < LeafExecNode < SparkPlan
- InMemoryTableScanExec < LeafExecNode < SparkPlan
Here is the diagram that we can traverse to retrieve lineage information.
It doesn’t claim to be comprehensive, for example, since Spark Structured Streaming may require new traveral paths not introduced earlier.
Notes on coding
We need a class to standardize the information gained from the various classes above (called X_i). This means the common class can be extended by different lineage-capture classes, for example one for HiveTableRelation and another for SaveIntoDataSourceCommand. Symbolically, the common capture class C has the children C1, C2, C3.
In terms of code implementation, we may write
if X_i then f_i() and return C_i
This is prone to repeating the pattern and generating multiple f_i() . Instead, one can build a map M having the keys being X_i and the values interfaces, each of which has only one method (for instance build()) that returns an extension of the common class C.
In creating the map M, we input the keys and lambda functions acting as f_i(). By this way the need to create explicit functions f_i() has been obviated. Now we can rewrite the above snippet as
if M.get(runtime instance type of LogicalPlan) then M.get(runtime instance type of LogicalPlan).build()
Hope the ideas presented here are useful to you to some extent.