This message was deleted.
# hamilton-help
s
This message was deleted.
👀 1
s
@Luke in that example
ps.DataFrame
is a pyspark dataframe, not a pandas on spark one. PySpark dataframes can’t be used with extract columns because there is no index that goes along with the columns.
In the
pands_on_spark
example you’ll see
ps
is
import pyspark.pandas as ps
with
pandas on spark
they implemented some hidden columns underneath to act as indexes so it can behave like regular pandas.
Is there something particular you’re trying to do?
I mean — I suppose we could add something that converts it to pandas on spark dataframe and then does it — but then why not just make the function do that explicitly?
l
I was only thinking one step ahead. I saw no reason
extract_columns
couldn’t just run
df.select(columns[0])
,
df.select(columns[1])
, etc… creating multiple
ps.DataFrame
nodes. As you say though, without an index these nodes are useless. Explicitly transforming to
pandas_on_spark
makes more sense.
👍 1
is
ps
here also
pyspark.pandas
?
s
No I believe that’s this column.
l
does that column preserve an index?
s
>
Copy code
def foo(bar: ps.DataFrame) -> ps.Column:
>     return df["bar"] + 1
This function is supposed to be used in conjunction with the
@with_columns
decorator.
l
ok. Why is it being defined as a
ps.Column
type but then referenced as a
pd.Series
type here — https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/spark/pyspark#columnar-operations
Copy code
import pandas as pd, pyspark.sql as ps

#map_transforms.py

# ps.Column type
def colums_1_from_dataframe(input_dataframe: ps.DataFrame) -> ps.Column:
    return input_dataframe.column_1_from_dataframe

. . . 

# referenced as a pd.Series?
def column_3(column_1_from_dataframe: pd.Series) -> pd.Series:
    return _some_transform(column_1_from_dataframe)
s
does that column preserve an index?
No index in pyspark. But yes, this is a map transforms, so rows would be preserved.
Why is it being defined as a
ps.Column
type but then referenced as a
pd.Series
type
yeah so this has to do with UDFs and what we’re doing in the background for you.
👍 1
l
from the docs:
Approach (2) requires functions that take in pyspark dataframes and return pyspark dataframes or columns for the functions reading directly from the dataframe. If you want to stay in pandas entirely for the
with_columns
group, you should use approach (1).
So could the transformations of the columns (e.g.
column_3
above) accept
column_1_from_dataframe
as a
ps.Column
or must it be
pd.Series
because of a behind the scenes conversion? The docs imply approach 2
pass_dataframe_as
allows pure PySpark.
s
IIRC the signatures are either pyspark dataframe and column as output, or you’re creating a vectorized UDF that’s denoted by pandas series types. [edit] Yep further down there’s this UDF part[/edit]
But whether
Copy code
def column_3(column_1_from_dataframe: ps.Column) -> ps.Column:
    return column_1_from_dataframe + 1
works I’d have to check the code / run it 😬
👍 1
l
yeah so this has to do with UDFs and what we’re doing in the background for you.
So is
pyspark.pandas
ultimately being used to execute the transformations or are Pandas UDFs a distinct computation pathway?
Not sure if the Spark optimizer attempts to run some pandas UDF code as
pyspark.pandas
calls or is doing something else.
s
So is
pyspark.pandas
ultimately being used to execute the transformations or are Pandas UDFs a distinct computation pathway?
distinct computation path.
> Not sure if the Spark optimizer attempts to run some pandas UDF code as
pyspark.pandas
calls or is doing something else. Pandas UDFs predate pandas on spark coming into mainline pyspark IIRC. So I think they’re quite separate.
l
right on 👌. Good to know there’s no additional Hamilton magic happening under the hood. Is it reasonable to assume the following performance ranking:
spark.sql.Column
>
spark.pandas.Column
>
pandas.Series
(although these are being converted to Pandas UDFs)? This is the sort of performance rank I’d expect without Hamilton.
I reckon the performance ranking will shuffle depending on operation type and data size, but this is a generalization.
s
It’s mainly whether it’s doing a vectorized operation underneath, or whether it’s row level.
🤔 1
So I think things can be equivalent — especially since pyarrow makes it so that zero copy happens between JVM and python in the vectorized cases.
l
I’ll run some tests to build intuition
👍 1
s
e.g. this is row level UDF
Copy code
def col_c(col_a: int, col_b: int) -> int:
    return a + b
this should be slower than:
Copy code
def col_c(col_a: pd.Series, col_b: pd.Series) -> pd.Series:
    return a + b
Because this one is vectorized — so the function will get a whole partition, rather than a row…
But I’m not sure if pyspark would optimize this to be vectorized or not
Copy code
def col_c(my_df: ps.DataFrame) -> ps.Column:
    return my_df.a + my_df.b
l
gotcha. Between
Copy code
# vectorized Pandas UDF
def col_c(col_a: pd.Series, col_b: pd.Series) -> pd.Series:
    return a + b
(entered via
with_columns(columns_to_pass)
)
Copy code
# pure PySpark
def col_c(my_df: ps.DataFrame) -> ps.Column:
    return my_df.a + my_df.b
(entered via
with_columns(pass_dataframe_as)
) I’d think the pure PySpark implementation wins more often. For this particular operation, they’re probably similar enough… But in general, I think PySpark will be more performant. I need to build more intuition and find more evidence but this is a helpful conversation so far.
s
Cool — I’d read the blog post 😉 (I think Hamilton could set testing this up fairly easily)
(also you might be interested in this thread if you’re going down the spark route - https://hamilton-opensource.slack.com/archives/C03M33QB4M8/p1709307971137559)