freezing-garage-69869
12/07/2022, 8:50 AM--packages io.acryl:datahub-spark-lineage:0.9.3-1
--conf spark.extraListeners=datahub.spark.DatahubSparkListener
--conf spark.datahub.rest.server=<http://10.5.0.37:8080>
My DataHub server is running on docker with the command datahub docker quickstart on the version 0.9.3-1.
During the execution of my Spark job I get the following error from DatahubSparkListener (I removed the detailed spark plans from the log):
22/12/06 17:31:09 INFO McpEmitter: MetadataWriteResponse(success=true, responseContent={"value":"urn:li:dataFlow:(spark,ModelizeBankAccountEvaluations,yarn)"}, underlyingResponse=HTTP/1.1 200 OK [Date: Tue, 06 Dec 2022 17:31:09 GMT, Content-Type: application/json, X-RestLi-Protocol-Version: 2.0.0, Content-Length: 71, Server: Jetty(9.4.46.v20220331)] [Content-Length: 71,Chunked: false])
22/12/06 17:31:10 ERROR DatahubSparkListener: java.lang.NullPointerException
at datahub.spark.DatasetExtractor.lambda$static$6(DatasetExtractor.java:147)
at datahub.spark.DatasetExtractor.asDataset(DatasetExtractor.java:237)
at datahub.spark.DatahubSparkListener$SqlStartTask.run(DatahubSparkListener.java:114)
at datahub.spark.DatahubSparkListener.processExecution(DatahubSparkListener.java:350)
at datahub.spark.DatahubSparkListener.onOtherEvent(DatahubSparkListener.java:262)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at <http://org.apache.spark.scheduler.AsyncEventQueue.org|org.apache.spark.scheduler.AsyncEventQueue.org>$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1447)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
22/12/06 17:31:10 INFO AsyncEventQueue: Process of event SparkListenerSQLExecutionStart(0,save at NativeMethodAccessorImpl.java:0,org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:282)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.lang.Thread.run(Thread.java:750),== Parsed Logical Plan ==
== Analyzed Logical Plan ==
SaveIntoDataSourceCommand org.apache.hudi.Spark3DefaultSource@6b5aa9cf, ...
== Optimized Logical Plan ==
SaveIntoDataSourceCommand org.apache.hudi.Spark3DefaultSource@6b5aa9cf...
== Physical Plan ==
Execute SaveIntoDataSourceCommand
+- SaveIntoDataSourceCommand org.apache.hudi.Spark3DefaultSource@6b5aa9cf...
by listener DatahubSparkListener took 2.295850812s.
I think it’s worth pointing out that I use Apache Hudi format to write the data.
Is there something I’m missing here ?
Thanks for your helpmicroscopic-mechanic-13766
12/07/2022, 12:52 PMPLAN_TO_DATASET.put(SaveIntoDataSourceCommand.class, (p, ctx, datahubConfig) -> {
SaveIntoDataSourceCommand cmd = (SaveIntoDataSourceCommand) p;
Map<String, String> options = JavaConversions.mapAsJavaMap(cmd.options());
String url = options.get("url"); // e.g. jdbc:<postgresql://localhost:5432/sparktestdb>
if (!url.contains("jdbc")) {
return Optional.empty();
}freezing-garage-69869
12/07/2022, 1:11 PMSaveIntoDataSourceCommand org.apache.hudi.Spark3DefaultSource@6b5aa9cf
(I have simply changed the s3 URL to a fake bucket name and path ):
SaveIntoDataSourceCommand org.apache.hudi.Spark3DefaultSource@6b5aa9cf, Map(hoodie.datasource.hive_sync.database -> data_lake_gold_history, hoodie.combine.before.insert -> true, hoodie.datasource.hive_sync.mode -> hms, hoodie.schema.on.read.enable -> true, path -> <s3://my-bucket/my/path/>, hoodie.datasource.write.precombine.field -> updated_at, hoodie.datasource.write.operation -> bulk_insert, hoodie.datasource.hive_sync.enable -> true, hoodie.datasource.write.recordkey.field -> _history_id, hoodie.table.name -> bank_account_evaluations, hoodie.table.type -> COPY_ON_WRITE, hoodie.datasource.write.table.name -> bank_account_evaluations, hoodie.combine.before.upsert -> true), Overwriteastonishing-answer-96712
12/07/2022, 6:17 PMdazzling-judge-80093
12/08/2022, 4:13 PMfreezing-garage-69869
12/09/2022, 7:47 AMastonishing-answer-96712
12/14/2022, 6:09 PMfreezing-garage-69869
12/15/2022, 8:05 AMastonishing-answer-96712
12/15/2022, 6:06 PMcareful-garden-46928
03/28/2023, 11:28 AMwonderful-coat-54946
07/25/2023, 3:37 PMspark-submit \
--packages org.apache.hadoop:hadoop-aws:3.2.3,io.delta:delta-core_2.12:2.4.0,io.acryl:datahub-spark-lineage:0.10.5-2rc7 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
--conf "spark.datahub.rest.server=<http://localhost:8080>" \
--conf "spark.extraListeners=datahub.spark.DatahubSparkListener" \
spark-code/delta_join.py
File delta-join just like this:
spark = SparkSession.builder \
.appName("Spark Test") \
.master("local[*]") \
.getOrCreate()
df1 = spark.read.format("delta").load("./delta-table/table1")
df2 = spark.read.format("delta").load("./delta-table/table2")
joinedDF = df1.join(df2, ["id"])
joinedDF.write.format("delta").save("delta-table/table_join_lineage")
But I am getting errors, the logs is below:
23/07/25 22:01:42 ERROR Utils: uncaught error in thread spark-listener-group-shared, stopping SparkContext
java.lang.NoSuchMethodError: org.apache.spark.util.JsonProtocol.sparkEventToJson(Lorg/apache/spark/scheduler/SparkListenerEvent;)Lorg/json4s/JsonAST$JValue;
at datahub.spark.DatahubSparkListener$SqlStartTask.<init>(DatahubSparkListener.java:87)
at datahub.spark.DatahubSparkListener.processExecution(DatahubSparkListener.java:350)
at datahub.spark.DatahubSparkListener.onOtherEvent(DatahubSparkListener.java:262)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at <http://org.apache.spark.scheduler.AsyncEventQueue.org|org.apache.spark.scheduler.AsyncEventQueue.org>$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1471)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
23/07/25 22:01:42 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-shared
java.lang.NoSuchMethodError: org.apache.spark.util.JsonProtocol.sparkEventToJson(Lorg/apache/spark/scheduler/SparkListenerEvent;)Lorg/json4s/JsonAST$JValue;
at datahub.spark.DatahubSparkListener$SqlStartTask.<init>(DatahubSparkListener.java:87)
at datahub.spark.DatahubSparkListener.processExecution(DatahubSparkListener.java:350)
at datahub.spark.DatahubSparkListener.onOtherEvent(DatahubSparkListener.java:262)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at <http://org.apache.spark.scheduler.AsyncEventQueue.org|org.apache.spark.scheduler.AsyncEventQueue.org>$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1471)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
Exception in thread "spark-listener-group-shared" java.lang.NoSuchMethodError: org.apache.spark.util.JsonProtocol.sparkEventToJson(Lorg/apache/spark/scheduler/SparkListenerEvent;)Lorg/json4s/JsonAST$JValue;
at datahub.spark.DatahubSparkListener$SqlStartTask.<init>(DatahubSparkListener.java:87)
at datahub.spark.DatahubSparkListener.processExecution(DatahubSparkListener.java:350)
at datahub.spark.DatahubSparkListener.onOtherEvent(DatahubSparkListener.java:262)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at <http://org.apache.spark.scheduler.AsyncEventQueue.org|org.apache.spark.scheduler.AsyncEventQueue.org>$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1471)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
23/07/25 22:01:42 INFO SparkContext: SparkContext is stopping with exitCode 0.
23/07/25 22:01:42 INFO SparkUI: Stopped Spark web UI at <http://192.168.0.108:4040>
23/07/25 22:01:42 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/07/25 22:01:42 INFO MemoryStore: MemoryStore cleared
23/07/25 22:01:42 INFO BlockManager: BlockManager stopped
23/07/25 22:01:42 INFO BlockManagerMaster: BlockManagerMaster stopped
23/07/25 22:01:42 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/07/25 22:01:42 INFO SparkContext: Successfully stopped SparkContext
Traceback (most recent call last):
File "/home/leo/Projects/DataHub-LakeHouse/spark-code/delta_join.py", line 16, in <module>
df1 = spark.read.format("delta").load("./delta-table/table1")
File "/opt/spark-3.4.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 300, in load
File "/opt/spark-3.4.0-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/opt/spark-3.4.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 169, in deco
File "/opt/spark-3.4.0-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o33.load.
: java.lang.IllegalStateException: SparkContext has been shutdown
How can I fix it ? Thank you.acoustic-lawyer-52945
09/08/2023, 2:49 PMUncaught exception in thread spark-listener-group-shared! java.lang.NoSuchMethodError:….
full message, similar but we are using spark through Databricks:
23/09/08 13:28:13 INFO AsyncEventQueue: Process of event SparkListenerSQLExecutionStart(executionId=0, ...) by listener DatahubSparkListener took 1.996030788s.
23/09/08 13:28:13 ERROR Utils: uncaught error in thread spark-listener-group-shared, stopping SparkContext
java.lang.NoSuchMethodError: org.apache.spark.util.JsonProtocol.sparkEventToJson(Lorg/apache/spark/scheduler/SparkListenerEvent;)Lorg/json4s/JsonAST$JValue;
at datahub.spark.DatahubSparkListener$SqlStartTask.<init>(DatahubSparkListener.java:87)
at datahub.spark.DatahubSparkListener.processExecution(DatahubSparkListener.java:350)
at datahub.spark.DatahubSparkListener.onOtherEvent(DatahubSparkListener.java:262)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:102)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:114)
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:114)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at <http://org.apache.spark.scheduler.AsyncEventQueue.org|org.apache.spark.scheduler.AsyncEventQueue.org>$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:109)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:105)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1655)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:105)
23/09/08 13:28:13 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-shared
java.lang.NoSuchMethodError: org.apache.spark.util.JsonProtocol.sparkEventToJson(Lorg/apache/spark/scheduler/SparkListenerEvent;)Lorg/json4s/JsonAST$JValue;
at datahub.spark.DatahubSparkListener$SqlStartTask.<init>(DatahubSparkListener.java:87)
at datahub.spark.DatahubSparkListener.processExecution(DatahubSparkListener.java:350)
at datahub.spark.DatahubSparkListener.onOtherEvent(DatahubSparkListener.java:262)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:102)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:114)
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:114)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at <http://org.apache.spark.scheduler.AsyncEventQueue.org|org.apache.spark.scheduler.AsyncEventQueue.org>$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:109)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:105)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1655)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:105)
23/09/08 13:28:13 INFO HiveServer2: Shutting down HiveServer2
23/09/08 13:28:13 ERROR DatabricksMain$DBUncaughtExceptionHandler: Uncaught exception in thread spark-listener-group-shared!
java.lang.NoSuchMethodError: org.apache.spark.util.JsonProtocol.sparkEventToJson(Lorg/apache/spark/scheduler/SparkListenerEvent;)Lorg/json4s/JsonAST$JValue;
at datahub.spark.DatahubSparkListener$SqlStartTask.<init>(DatahubSparkListener.java:87)
at datahub.spark.DatahubSparkListener.processExecution(DatahubSparkListener.java:350)
at datahub.spark.DatahubSparkListener.onOtherEvent(DatahubSparkListener.java:262)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:102)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:114)
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:114)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at <http://org.apache.spark.scheduler.AsyncEventQueue.org|org.apache.spark.scheduler.AsyncEventQueue.org>$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:109)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:105)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1655)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:105)
We’re on DBR 12.2, Spark 3.2, which should have JsonProtocol.sparkEventToJson available.acoustic-lawyer-52945
09/08/2023, 3:02 PM