Orel Yamin
08/12/2024, 11:35 AMairflow.providers.apache.flink.operators.flink_kubernetes
D. Draco O'Brien
08/12/2024, 11:42 AMfrom datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.flink.operators.flink_kubernetes import FlinkKubernetesOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 4, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'flink_job_on_k8s',
default_args=default_args,
description='Submit a Flink job to Kubernetes',
schedule_interval=timedelta(days=1),
)
# Define the task to submit the Flink job
submit_flink_job = FlinkKubernetesOperator(
task_id='submit_flink_job',
application_file='path/to/your/flink_job.yaml', # Path to your Flink job YAML configuration
kubernetes_conn_id='kubernetes_default', # The connection ID for your Kubernetes cluster
flink_cluster_id='my-flink-cluster', # The ID of your Flink cluster as recognized by the Flink Operator
log_level='INFO',
do_xcom_push=True,
dag=dag,
)
# Assuming you don't have any downstream tasks, but if you do, you'd set them up here.
D. Draco O'Brien
08/12/2024, 11:47 AMOrel Yamin
08/12/2024, 12:00 PMD. Draco O'Brien
08/12/2024, 12:14 PMD. Draco O'Brien
08/12/2024, 12:15 PMD. Draco O'Brien
08/12/2024, 12:16 PMOrel Yamin
08/12/2024, 12:16 PMD. Draco O'Brien
08/12/2024, 12:20 PMD. Draco O'Brien
08/12/2024, 12:20 PMOrel Yamin
08/12/2024, 2:43 PMScott Robertson
08/13/2024, 12:02 PMOrel Yamin
08/13/2024, 12:09 PMScott Robertson
08/13/2024, 12:12 PMOrel Yamin
08/14/2024, 9:48 AMD. Draco O'Brien
08/14/2024, 10:23 AM