Hi Team, I am trying to integrate PySpark notebook...
# troubleshoot
t
Hi Team, I am trying to integrate PySpark notebook to Datahub. I am using spark on k8s under the hood. It works when I use spark-submit but not when I use notebooks. I am getting this error:
Copy code
22/08/13 20:54:48 ERROR DatahubSparkListener: java.lang.NullPointerException
Even though I get this error, the command completes execution without pushing data to Datahub. Using Spark 3.3.0. Any help appreciated. Thanks.
l
@careful-pilot-86309 ^
c
@thankful-morning-85093 Can you please provide the complete logs with debug enabled? Each type of deployment produce spark events little differently. We have not done testing with k8s type of deployment. Complete log trace will help me understand the series of events.
t
Copy code
2/08/16 05:17:17 ERROR DatahubSparkListener: java.lang.NullPointerException
	at datahub.spark.DatahubSparkListener$SqlStartTask.run(DatahubSparkListener.java:84)
	at datahub.spark.DatahubSparkListener.processExecution(DatahubSparkListener.java:323)
	at datahub.spark.DatahubSparkListener.onOtherEvent(DatahubSparkListener.java:237)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at <http://org.apache.spark.scheduler.AsyncEventQueue.org|org.apache.spark.scheduler.AsyncEventQueue.org>$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1446)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
I set these:
Copy code
spark.jars.packages
spark.extraListeners
spark.datahub.rest.server
spark.datahub.rest.token
It works with spark-submit command
Thanks for looking into this
c
From logs it seems that spark application start event is missing but to confirm please provide all the logs from application start
t
Copy code
athorat-spark
Warning: Ignoring non-Spark config property: mapreduce.input.fileinputformat.input.dir.recursive
Warning: Ignoring non-Spark config property: hive.metastore.uris
:: loading settings :: url = jar:file:/usr/local/spark-3.3.0-bin-spark3-hadoop320/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
io.acryl#datahub-spark-lineage added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0f294cfe-1788-4f4a-9342-6afdf2262082;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.1026 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found io.acryl#datahub-spark-lineage;0.8.41-3-rc3 in central
:: resolution report :: resolve 138ms :: artifacts dl 6ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.1026 from central in [default]
	io.acryl#datahub-spark-lineage;0.8.41-3-rc3 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.2 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   4   |   0   |   0   |   0   ||   4   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-0f294cfe-1788-4f4a-9342-6afdf2262082
	confs: [default]
	0 artifacts copied, 4 already retrieved (0kB/3ms)
22/08/16 16:38:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/16 16:38:21 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
22/08/16 16:38:21 INFO MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
22/08/16 16:38:21 INFO MetricsSystemImpl: s3a-file-system metrics system started
22/08/16 16:38:22 INFO SparkContext: Added file <s3://xxxx-some-internal-lib/libs> at <s3://xxxx-some-internal-lib/libs> with timestamp 1660667902507
22/08/16 16:38:22 INFO Utils: Fetching <s3://xxxx-some-internal-lib/libs/__init__.py> to /tmp/spark-d726abc6-8791-43c1-87fa-64c2674edcc1/userFiles-73bd34aa-b2b6-4a06-a465-173f568b5688/libs/fetchFileTemp1799041977827041966.tmp
22/08/16 16:38:22 INFO Utils: Fetching <s3://xxxx-some-internal-lib/libs/__init__.pyc> to /tmp/spark-d726abc6-8791-43c1-87fa-64c2674edcc1/userFiles-73bd34aa-b2b6-4a06-a465-173f568b5688/libs/fetchFileTemp3988794557929964743.tmp
22/08/16 16:38:22 INFO Utils: Fetching <s3://xxxx-some-internal-lib/libs/actionid.py> to /tmp/spark-d726abc6-8791-43c1-87fa-64c2674edcc1/userFiles-73bd34aa-b2b6-4a06-a465-173f568b5688/libs/fetchFileTemp4110087518203190736.tmp
22/08/16 16:38:26 INFO Utils: Fetching <s3://xxxx-some-internal-lib/libs/test/configloader_test.py> to /tmp/spark-d726abc6-8791-43c1-87fa-64c2674edcc1/userFiles-73bd34aa-b2b6-4a06-a465-173f568b5688/libs/test/fetchFileTemp5057883963867047943.tmp
22/08/16 16:38:26 INFO Utils: Fetching <s3://xxxx-some-internal-lib/libs/test/resources/import_sample.json> to /tmp/spark-d726abc6-8791-43c1-87fa-64c2674edcc1/userFiles-73bd34aa-b2b6-4a06-a465-173f568b5688/libs/test/resources/fetchFileTemp7776098277787930261.tmp
22/08/16 16:38:26 INFO Utils: Fetching <s3://xxxx-some-internal-lib/libs/test/resources/master.json> to /tmp/spark-d726abc6-8791-43c1-87fa-64c2674edcc1/userFiles-73bd34aa-b2b6-4a06-a465-173f568b5688/libs/test/resources/fetchFileTemp3628931421824204763.tmp
22/08/16 16:38:26 INFO SparkContext: Added file /var/run/pyspark/mlp_load_profiles.py at <spark://athorat-notebook-test-service.data-platform-jupyterlab.svc.cluster.local:2222/files/mlp_load_profiles.py> with timestamp 1660667906213
22/08/16 16:38:26 INFO Utils: Copying /run/pyspark/mlp_load_profiles.py to /tmp/spark-d726abc6-8791-43c1-87fa-64c2674edcc1/userFiles-73bd34aa-b2b6-4a06-a465-173f568b5688/mlp_load_profiles.py
22/08/16 16:38:26 INFO SparkContext: Added file /var/run/pyspark/mlpuserprofile_pb2.py at <spark://athorat-notebook-test-service.data-platform-jupyterlab.svc.cluster.local:2222/files/mlpuserprofile_pb2.py> with timestamp 1660667906218
22/08/16 16:38:26 INFO Utils: Copying /run/pyspark/mlpuserprofile_pb2.py to /tmp/spark-d726abc6-8791-43c1-87fa-64c2674edcc1/userFiles-73bd34aa-b2b6-4a06-a465-173f568b5688/mlpuserprofile_pb2.py
The spark context initiation logs
Then when I run df.show() I get the error I pasted above
c
Ideally, we dont process sql execution associated with df.show() as it doesnot produce any persistent data. I am looking for logs which are mentioned in here. Also, if you enable the debug logs, just above your error you should see log like "SQL Exec start event with id ". I am interested in those. IF you can give me such logs, it will be helpful.
t
Ahh ok. Let me pull that
Copy code
22/08/17 04:47:34 INFO CodeGenerator: Code generated in 34.227165 ms
22/08/17 04:47:34 DEBUG DatahubSparkListener: SQL Exec end event with id 0
22/08/17 04:47:34 ERROR DatahubSparkListener: java.lang.NullPointerException
	at datahub.spark.DatahubSparkListener$3.apply(DatahubSparkListener.java:258)
	at datahub.spark.DatahubSparkListener$3.apply(DatahubSparkListener.java:254)
	at scala.Option.foreach(Option.scala:407)
	at datahub.spark.DatahubSparkListener.processExecutionEnd(DatahubSparkListener.java:254)
	at datahub.spark.DatahubSparkListener.onOtherEvent(DatahubSparkListener.java:241)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at <http://org.apache.spark.scheduler.AsyncEventQueue.org|org.apache.spark.scheduler.AsyncEventQueue.org>$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1446)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
This is the end one. Let me also pull the start logs
Copy code
22/08/17 04:47:23 DEBUG DatahubSparkListener: SQL Exec start event with id 0
22/08/17 04:47:23 DEBUG ShuffleQueryStageExec: Materialize query stage ShuffleQueryStageExec: 0
22/08/17 04:47:23 ERROR DatahubSparkListener: java.lang.NullPointerException
	at datahub.spark.DatahubSparkListener$SqlStartTask.run(DatahubSparkListener.java:84)
	at datahub.spark.DatahubSparkListener.processExecution(DatahubSparkListener.java:323)
	at datahub.spark.DatahubSparkListener.onOtherEvent(DatahubSparkListener.java:237)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at <http://org.apache.spark.scheduler.AsyncEventQueue.org|org.apache.spark.scheduler.AsyncEventQueue.org>$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1446)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
@careful-pilot-86309 let me know if you need any other logs. I did not paste the entire thing because its 35k lines and would be nightmare to redact
@careful-pilot-86309 Thanks a lot for the updated jar. I was able to capture lineage with jupyter on spark k8s. Got this error:
Copy code
22/08/18 22:05:52 ERROR DatasetExtractor: class org.apache.spark.sql.catalyst.plans.logical.Project is not supported yet. Please contact datahub team for further support. 
22/08/18 22:05:52 ERROR DatasetExtractor: class org.apache.spark.sql.catalyst.plans.logical.GlobalLimit is not supported yet. Please contact datahub team for further support. 
22/08/18 22:05:52 ERROR DatasetExtractor: class org.apache.spark.sql.catalyst.plans.logical.LocalLimit is not supported yet. Please contact datahub team for further support.
Seems like warning??
c
Yes, Its warning.It should be ok