Trying to find any examples of triggering Flink De...
# troubleshooting
o
Trying to find any examples of triggering Flink Deployment on Kubernetes using Airflow ( I have the official Flink Operator installed in Kubrnetes already). Did any succeed with doing so ? I'm trying to use
Copy code
airflow.providers.apache.flink.operators.flink_kubernetes
d
Here is an example of how you would do that by modifying the DAG (Directed Acyclic Graph) of Airflow.
Copy code
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.flink.operators.flink_kubernetes import FlinkKubernetesOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 4, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'flink_job_on_k8s',
    default_args=default_args,
    description='Submit a Flink job to Kubernetes',
    schedule_interval=timedelta(days=1),
)

# Define the task to submit the Flink job
submit_flink_job = FlinkKubernetesOperator(
    task_id='submit_flink_job',
    application_file='path/to/your/flink_job.yaml',  # Path to your Flink job YAML configuration
    kubernetes_conn_id='kubernetes_default',  # The connection ID for your Kubernetes cluster
    flink_cluster_id='my-flink-cluster',  # The ID of your Flink cluster as recognized by the Flink Operator
    log_level='INFO',
    do_xcom_push=True,
    dag=dag,
)

# Assuming you don't have any downstream tasks, but if you do, you'd set them up here.
If you haven’t set up ’kubernetes_connection_id`, you’ll need to add a Kubernetes connection in the Airflow UI under Admin > Connections and make sure it matches what’s in the DAG. Also ‘my-flink-cluster’ should match the name or identifier of the Flink cluster as managed by the Flink Operator in your Kubernetes cluster.
o
Thanks for your reply Draco, Wondering what flink_cluster_id should be if i want to trigger a new Flink Deployment, meaning that there isn't any Flink Cluster before triggering the DAG.
d
I think it’s kind of setup to work with an existing Flink cluster. Flink Operator probably specifies this. And you use it first to deploy the Flink cluster on K8s.
DAG’s in Airflow and deployment of jobs is mostly onto existing Flink clusters.
I think its just an error condition to deploy execute the airflow DAG if no existing flink cluster exists.
o
given that i run the Airflow on Kubernetes, should i change the value of kubernetes_conn_id from its default one?
d
Its for the kubernetes cluster where Flink jobs are going to be running.
And it is configurable in the airflow ui which may or may not be running on the same K8 cluster.
o
Much appreciated @D. Draco O'Brien
s
ooooo..... this looks neat. I didn't even think about the possibility to manage flink deployments with Airflow. Is there clear benefit from doing this? What is the use case where you'd want to integrate Flink with Airflow? We are tossing around ideas currently about managing k8s deployment yamls with a variety of methods and this is a new one.
o
Flink has 2 execution modes - Batch and Streaming. you can read more about it here: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/execution_mode/ i would use airflow to schedule Batch Flink jobs but not Streaming.
s
Yep. Makes sense.
o
@D. Draco O'Brien have you ever succeed running a Flink Job using Airflow ?
d
No I have never done that but no reason it cannot work.