Don't want to promise too much yet, but the ML Pla...
# contribute-code
h
Don't want to promise too much yet, but the ML Platform team at Wolt is looking into adding support for ingesting Flyte workflows/tasks as DataFlows/-Jobs. Figuring out inputs and outputs might be tricky but 🤞
m
That's super cool @high-hospital-85984
b
W00T1
h
@mammoth-bear-12532 @big-carpet-38439 We (at Wolt) now have some crude scaffolding for a pull-type ingestion pipeline, but while talking this over with the Flyte guys it's come to our attention that they themselves have actually been looking into the problem already (push-type of approach): https://github.com/unionai/flyteevents-datahub We maybe had some concerns regarding the extensibility of the Union's approach (how do we attach our custom SQL parsers to it? How do we "transform" a path input for a given task into the ML Model entity it actually is?). I invited @billions-lawyer-59647 to this merry bunch, in the hopes that we could bang our heads together and figure out how this problem would be solved in the best possible way. WDYT?
m
Hey @billions-lawyer-59647 welcome! and thanks @high-hospital-85984 for connecting the projects together!
b
Hello all, sorry I was lost in the sea of slack orgs. Hi @mammoth-bear-12532 fantastic to meet you. We do have a few folks who want Flyte and datahub to work together- would Love to collaborate
@high-hospital-85984 can I understand what you mean by sql parsers. Also what we have was a community contribution- just it's like a prototype. But yes Flyte has an event egress and that would make it easier to add right metadata
h
@billions-lawyer-59647 I'm not sure if you're familiar with the Airflow-Datahub integration? It taps into Airflow's lineage functionality and implements a
LineageBackend
that emits metadata events to Datahub. Airflow's lineage functionality takes the "output" object of an upstream task and makes it the input of any downstream task, but the "inlet"/"outlet" objects are not parsed from the arguments to the task, but need to be defined manually (or passed through task relationships). This works, but requires manual interaction, and is a bit simplistic when it comes to more dynamic inputs (say a complicated SQL query to a generic Snowflake operator). For this reason, what we've done at Wolt is built functionality around some key helper functions (
upload_to_s3
,
run_snowflake_query
) that parses the inputs of the functions and deduces the S3 buckets or the Snowflake tables that are actually involved in the process, and appends the task's
inlets
/`outlets` lists. Making our users do this manually is out of the question 😅 We've also helped the lineage backend separate between Snowflake tables, S3 buckets, and say external APIs, even though they all are Datasets in Datahub's eyes. So we'd like to replicate some of this functionality in Flyte. The bare minimum would be to be able to define the inputs/outputs (and their types) to tasks, including external sources (Snowflake, S3, etc), in a manual way. The next step would be to be able to build custom plugins/transformers that can parse the arguments to the task and create the metadata entities. If we can dynamically add metadata to the task from inside the task, at runtime, it would be even better.
Wolt is happy to help, but it would be good if we could together come up with a RFC for how Flyte wants to support this sort or metadata tracking.
b
I agree that we should do an RFC.
But, for me to understand,
1. Flyte already has inputs and outputs right? 2. When you write Snowflake Queries in Flyte, you can still specify
inputs
and
outputs
. That being said you may still want to parse the query probably to figure out table joins etc 3. So Flyte currently has an event egress. This is still early work. This should send as an event the following things, a. Indicate when a workflow, starts, stops, fails b. Indicate when a task starts, stops, fails c. you should be able to get the actual task itself
for example the snowflake query
as the version is tied to the event. d. you will also always get the inputs and outputs for every task & workflow execution e. Should this not be enough to construct arbitrary lineage?
h
Yes, sorry, I might have jumped over a couple of steps 😅 To answer your last question: maybe, let's figure it out together! So to illustrate where I'm coming from, I'll give you an example. We have a simple workflow, with three tasks: 1. Makes a (parameterized) query to snowflake to load some training data. The input to the task is maybe the connection details and the parameter to the query (say, the date). The query itself is defined in the task code. The output is an what we might call a Flyte-object, adhering to some schema defined in the task definition. 2. Takes as input the Flyte-object, does some preprocessing using Pandas, and outputs a new Flyte-object. 3. Takes as input the second Flyte-object and, say, a S3 bucket address, trains a model, and uploads the model to said S3 bucket. There might not be any output from this task as far as Flyte is concerned, the uploading of the model is a side-effect. In the first task, the upstream lineage should consist of all the tables used in the snowflake query (this is impossible to parse only from the inputs of the task), while the date-parameter has no importance. The downstream lineage is a "Flyte dataset" (this is easy to parse, and you're already doing it in the flyteevents-datahub repo it seems). In the second task, the lineage consists of two "Flyte-datasets" (again, trivial). In the third task, the upstream lineage is the second "Flyte-dataset" (easy), but the downstream lineage would actually be a MLModel (in Datahub's terminology). Figuring out this downstream lineage is again very challenging.
b
For 1 would you be parsing the query? Then that is exposed by Flyte task template- but else I don't know how Flyte can help I don't follow the meaning of downstream in 3
Let's do a quick call?
cc @billions-apartment-18513 / @modern-pilot-97597
m
hey @high-hospital-85984 - want to do a call some time this week? what day works best for you?
h
Let's! @modern-pilot-97597 first off, in which timezone are you? I'm based in Finland so EEST.
m
I would love to attend as well if time zones are not an issue!
b
@high-hospital-85984, we're in Pacific Daylight Time (PDT), which is UTC -7. @mammoth-bear-12532, what timezone are you in? How does next Thursday (May-19th) 4pm EEST look like?
m
PDT as well!
h
I'm UTC+3, so 4pm is 2am for me. Any possibility for morning times, say like 9am or 10am PDT? 😅
b
sure, 9am PDT works. So next Thu (May-19th) 9am PDT then?
h
Works for me! Can you send an invite to make it official?
b
amazing, will send the invite shortly. @mammoth-bear-12532, which email should I use for you?
1
@dazzling-judge-80093, @big-carpet-38439 mentioned you might be interested in integrating with Flyte as well.
1
just sent the invite. Please ping here if you also want to be invited.
👍 1
A bit of a scheduling mishap on my part. Is it ok if we move the meeting to next Tuesday (May-24th) around the same time? @high-hospital-85984, @mammoth-bear-12532, @dazzling-judge-80093
d
I will have to host a meetup on that day, can we have it after Tuesday?
h
23rd-25th is going to be challenging for me as well
b
sure thing, let's shoot for Thu May-26th at the same time then?
h
Works for me!
d
It works for me as well
b
great, thanks for the flexibility. I just updated the invite.
d
@billions-apartment-18513 I just realized at that time we will have a Datahub Town Hall and I will demo on that as well. Please, can we move it later on that day or another day?
b
oh, of course! I guess it has to be another day given the timezones involved.
@dazzling-judge-80093, if that's ok with you we're going to talk to Fredrik since we already had the meeting scheduled. I'm fairly certain that this will be the first of many opportunities to collaborate and drive this integration forward.
teamwork 1