您好,登錄后才能下訂單哦!
我就廢話不多說了,直接上代碼吧!
""" 對于每天存儲文件,文件數(shù)量過多,占用空間 采用保存最新的三個文件 """ from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.models import Variable from sctetl.airflow.utils import dateutils from datetime import datetime,timedelta import logging import os import shutil """ base_dir = "/data" data_dir = "/gather" "gather下邊存在不同的文件夾" "/data/gather/test" "test路徑下有以下文件夾" "20180812、20180813、20180814、20180815、20180816" """ base_dir = Variable.get("base_dir") data_dir = Variable.get("data_dir") keep = 3 default_arg = { "owner":"airflow", "depends_on_past":False, "start_date":dateutils.get_start_date_local(2018,8,27,18,5), "email":[''], "email_on_failure":False, "email_on_retry":False, "retries":1, "retry_delay":timedelta(minutes=5) } dag = DAG(dag_id="keep_three_day",default_args=default_arg,schedule_interval=dateutils.get_schedule_interval_local(18,5)) def keep_three_day(): path = os.path.join(base_dir, data_dir) date_cates = os.listdir(path) for cate in date_cates: p = os.path.join(base_dir, data_dir, cate) if os.path.isdir(p): dir_names = os.listdir(p) dir_names.sort() for i in dir_names[:-keep]: logging.info("刪除目錄 {path}".format(path=os.path.join(p, i))) shutil.rmtree(os.path.join(p, i)) with dag: keep_three_file = PythonOperator(task_id="keep_three_file",python_callable=keep_three_day(),dag=dag) keep_three_file
以上這篇python 實現(xiàn)保存最新的三份文件,其余的都刪掉就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持億速云。
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。