溫馨提示×

如何在Airflow中編寫執(zhí)行MySQL查詢的任務(wù)

小樊
91
2024-08-10 22:38:44
欄目: 云計算

在Airflow中編寫執(zhí)行MySQL查詢的任務(wù)可以通過使用PythonOperator來執(zhí)行查詢的Python函數(shù)。以下是一個簡單的示例:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mysql_operator import MySqlOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1
}

dag = DAG('mysql_query_dag', default_args=default_args, schedule_interval='@daily')

def execute_mysql_query():
    # 連接MySQL數(shù)據(jù)庫
    conn = MySQLdb.connect(host="localhost", user="root", passwd="password", db="database")
    cursor = conn.cursor()
    
    # 執(zhí)行查詢
    cursor.execute("SELECT * FROM table")
    
    # 獲取結(jié)果
    rows = cursor.fetchall()
    for row in rows:
        print(row)
    
    # 關(guān)閉連接
    conn.close()

mysql_task = PythonOperator(
    task_id='execute_mysql_query',
    python_callable=execute_mysql_query,
    dag=dag
)

mysql_task

在這個例子中,首先創(chuàng)建了一個DAG,并定義了一個Python函數(shù)execute_mysql_query,該函數(shù)連接到MySQL數(shù)據(jù)庫,執(zhí)行查詢并打印結(jié)果。然后使用PythonOperator來執(zhí)行這個函數(shù),并將其添加到DAG中。當(dāng)DAG運(yùn)行時,該任務(wù)將連接到MySQL數(shù)據(jù)庫執(zhí)行查詢。

0