在Airflow中編寫執(zhí)行MySQL查詢的任務(wù)可以通過使用PythonOperator
來執(zhí)行查詢的Python函數(shù)。以下是一個簡單的示例:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mysql_operator import MySqlOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1
}
dag = DAG('mysql_query_dag', default_args=default_args, schedule_interval='@daily')
def execute_mysql_query():
# 連接MySQL數(shù)據(jù)庫
conn = MySQLdb.connect(host="localhost", user="root", passwd="password", db="database")
cursor = conn.cursor()
# 執(zhí)行查詢
cursor.execute("SELECT * FROM table")
# 獲取結(jié)果
rows = cursor.fetchall()
for row in rows:
print(row)
# 關(guān)閉連接
conn.close()
mysql_task = PythonOperator(
task_id='execute_mysql_query',
python_callable=execute_mysql_query,
dag=dag
)
mysql_task
在這個例子中,首先創(chuàng)建了一個DAG,并定義了一個Python函數(shù)execute_mysql_query
,該函數(shù)連接到MySQL數(shù)據(jù)庫,執(zhí)行查詢并打印結(jié)果。然后使用PythonOperator
來執(zhí)行這個函數(shù),并將其添加到DAG中。當(dāng)DAG運(yùn)行時,該任務(wù)將連接到MySQL數(shù)據(jù)庫執(zhí)行查詢。