Slackbot
03/12/2024, 3:59 AMStefan Krawczyk
03/12/2024, 4:05 AMps.DataFrame
is a pyspark dataframe, not a pandas on spark one.
PySpark dataframes can’t be used with extract columns because there is no index that goes along with the columns.Stefan Krawczyk
03/12/2024, 4:06 AMpands_on_spark
example you’ll see ps
is import pyspark.pandas as ps
Stefan Krawczyk
03/12/2024, 4:06 AMpandas on spark
they implemented some hidden columns underneath to act as indexes so it can behave like regular pandas.Stefan Krawczyk
03/12/2024, 4:08 AMStefan Krawczyk
03/12/2024, 4:32 AMLuke
03/12/2024, 4:41 AMextract_columns
couldn’t just run df.select(columns[0])
, df.select(columns[1])
, etc… creating multiple ps.DataFrame
nodes. As you say though, without an index these nodes are useless. Explicitly transforming to pandas_on_spark
makes more sense.Luke
03/12/2024, 4:42 AMLuke
03/12/2024, 4:43 AMps
here also pyspark.pandas
?Stefan Krawczyk
03/12/2024, 4:46 AMLuke
03/12/2024, 4:47 AMStefan Krawczyk
03/12/2024, 4:47 AMdef foo(bar: ps.DataFrame) -> ps.Column:
> return df["bar"] + 1
This function is supposed to be used in conjunction with the @with_columns
decorator.Luke
03/12/2024, 4:49 AMps.Column
type but then referenced as a pd.Series
type here — https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/spark/pyspark#columnar-operations
import pandas as pd, pyspark.sql as ps
#map_transforms.py
# ps.Column type
def colums_1_from_dataframe(input_dataframe: ps.DataFrame) -> ps.Column:
return input_dataframe.column_1_from_dataframe
. . .
# referenced as a pd.Series?
def column_3(column_1_from_dataframe: pd.Series) -> pd.Series:
return _some_transform(column_1_from_dataframe)
Stefan Krawczyk
03/12/2024, 4:49 AMdoes that column preserve an index?No index in pyspark. But yes, this is a map transforms, so rows would be preserved.
Stefan Krawczyk
03/12/2024, 4:51 AMWhy is it being defined as ayeah so this has to do with UDFs and what we’re doing in the background for you.type but then referenced as aps.Column
typepd.Series
Luke
03/12/2024, 4:53 AMApproach (2) requires functions that take in pyspark dataframes and return pyspark dataframes or columns for the functions reading directly from the dataframe. If you want to stay in pandas entirely for theSo could the transformations of the columns (e.g.group, you should use approach (1).with_columns
column_3
above) accept column_1_from_dataframe
as a ps.Column
or must it be pd.Series
because of a behind the scenes conversion? The docs imply approach 2 pass_dataframe_as
allows pure PySpark.Stefan Krawczyk
03/12/2024, 4:55 AMStefan Krawczyk
03/12/2024, 4:57 AMdef column_3(column_1_from_dataframe: ps.Column) -> ps.Column:
return column_1_from_dataframe + 1
works I’d have to check the code / run it 😬Luke
03/12/2024, 4:59 AMyeah so this has to do with UDFs and what we’re doing in the background for you.So is
pyspark.pandas
ultimately being used to execute the transformations or are Pandas UDFs a distinct computation pathway?Luke
03/12/2024, 4:59 AMpyspark.pandas
calls or is doing something else.Stefan Krawczyk
03/12/2024, 5:00 AMSo isdistinct computation path.ultimately being used to execute the transformations or are Pandas UDFs a distinct computation pathway?pyspark.pandas
Stefan Krawczyk
03/12/2024, 5:01 AMpyspark.pandas
calls or is doing something else.
Pandas UDFs predate pandas on spark coming into mainline pyspark IIRC. So I think they’re quite separate.Luke
03/12/2024, 5:04 AMspark.sql.Column
> spark.pandas.Column
> pandas.Series
(although these are being converted to Pandas UDFs)? This is the sort of performance rank I’d expect without Hamilton.Luke
03/12/2024, 5:05 AMStefan Krawczyk
03/12/2024, 5:05 AMStefan Krawczyk
03/12/2024, 5:06 AMLuke
03/12/2024, 5:07 AMStefan Krawczyk
03/12/2024, 5:08 AMdef col_c(col_a: int, col_b: int) -> int:
return a + b
this should be slower than:
def col_c(col_a: pd.Series, col_b: pd.Series) -> pd.Series:
return a + b
Because this one is vectorized — so the function will get a whole partition, rather than a row…Stefan Krawczyk
03/12/2024, 5:09 AMdef col_c(my_df: ps.DataFrame) -> ps.Column:
return my_df.a + my_df.b
Luke
03/12/2024, 5:13 AM# vectorized Pandas UDF
def col_c(col_a: pd.Series, col_b: pd.Series) -> pd.Series:
return a + b
(entered via with_columns(columns_to_pass)
)
# pure PySpark
def col_c(my_df: ps.DataFrame) -> ps.Column:
return my_df.a + my_df.b
(entered via with_columns(pass_dataframe_as)
)
I’d think the pure PySpark implementation wins more often. For this particular operation, they’re probably similar enough… But in general, I think PySpark will be more performant. I need to build more intuition and find more evidence but this is a helpful conversation so far.Stefan Krawczyk
03/12/2024, 5:19 AMStefan Krawczyk
03/12/2024, 5:23 AM