千家信息网

管理 AirFlow 方法

发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,@[toc]管理 AirFlow 方法进程管理工具Supervisord安装进程管理工具Supervisord管理airflow进程easy_install supervisor #此方法不适用于p
千家信息网最后更新 2025年12月01日管理 AirFlow 方法

@[toc]

管理 AirFlow 方法

进程管理工具Supervisord

  1. 安装进程管理工具Supervisord管理airflow进程

    easy_install supervisor  #此方法不适用于python3安装(会出现很多问题)echo_supervisord_conf > /etc/supervisord.conf
  2. 编辑文件supervisord.conf,添加启动命令
    vi /etc/supervisord.conf

[program:airflow_web]
command=/usr/bin/airflow webserver -p 8080

[program:airflow_worker]
command=/usr/bin/airflow worker

[program:airflow_scheduler]
command=/usr/bin/airflow scheduler

>  3. 启动supervisord服务

/usr/bin/supervisord -c /etc/supervisord.conf

>  4. 此时可以用 supervisorctl 来管理airflow服务了

supervisorctl start airflow_web
supervisorctl stop airflow_web
supervisorctl restart airflow_web
supervisorctl stop all

### 进程管理工具 systemd >  1.  vim   /etc/sysconfig/airflow  # systemd需要调用此文件,一般定义的是airflow的变量

AIRFLOW_CONFIG=/root/airflow/airflow.cfg
AIRFLOW_HOME=/root/airflow

>  2. vim  /usr/lib/systemd/system/airflow-webserver.service  #systemctl 管理的服务名>  其他的服务也可以使用此方式进行定义

[Unit]
Description=Airflow webserver daemon
After=network.target postgresql.service mysql.service redis.service
Wants=postgresql.service mysql.service redis.service

[Service]
EnvironmentFile=/etc/sysconfig/airflow
User=root
Group=root
Type=simple
ExecStart=/bin/bash -c "export PATH=${PATH}:/usr/local/python3/bin/ ; /usr/local/python3/bin/airflow webserver -p 8080 --pid /root/airflow/service/webserver.pid -A /root/airflow/service/webserver.out -E /root/airflow/service/webserver.err -l /root/airflow/service/webserver.log"

KillMode=process
Restart=on-failure
RestartSec=5s
PrivateTmp=true

[Install]
WantedBy=multi-user.target

>  3. systemctl  daemon-reload  #加载服务>  4. systemctl  status  airflow-webserver.service  #查看服务状态,后期就可以用这种方式进行管理### 使用脚本管理airflow```bash#!/bin/bash#=== This is the function about airflow webserver service ===webserver_status(){    echo -e "\e[36m  Checking service status, please wait ... \e[0m"    sleep  3    Status=`ps -elf| grep "airflow[ -]webserver" |wc -l`    if [ $Status -eq 0 ] ;then        echo -e "\e[31m webserver is stop !!! \e[0m"    else         echo -e "\e[32m webserver is running... \e[0m"    fi}webserver_start(){    echo  -e "\e[36m Starting airflow webserver ... \e[0m"    sleep 1    nohup /usr/local/python3/bin/airflow  webserver >> /root/airflow/service/webserver.log 2>&1 &    webserver_status}webserver_stop(){    echo  -e "\e[36m Stopping airflow webserver ... \e[0m"    sleep 1    /usr/bin/kill -9 `ps -elf| grep "airflow[ -]webserver" | grep -v grep |awk -F" " '{ print $4 }'`    rm -rf /root/airflow/airflow-webserver.pid    webserver_status}#=== This is the function about airflow scheduler service ===scheduler_status(){    echo -e "\e[36m  Checking service status, please wait ... \e[0m"    sleep  3    Status=`ps -elf| grep "airflow[ -]scheduler" |wc -l`    if [ $Status -eq 0 ] ;then        echo -e "\e[31m scheduler is stop !!! \e[0m"    else         echo -e "\e[32m scheduler is running... \e[0m"    fi}scheduler_start(){    echo  -e "\e[36m Starting airflow scheduler ... \e[0m"    sleep 1    nohup /usr/local/python3/bin/airflow  scheduler >> /root/airflow/service/scheduler.log 2>&1 &    scheduler_status}scheduler_stop(){    echo  -e "\e[36m Stopping airflow scheduler ... \e[0m"    sleep 1    /usr/bin/kill -9 `ps -elf| grep "airflow[ -]scheduler" | grep -v grep |awk -F" " '{ print $4 }'`    rm -rf /root/airflow/airflow-scheduler.pid    scheduler_status}#=== This is the function about airflow flower service ===flower_status(){    echo -e "\e[36m  Checking service status, please wait ... \e[0m"    sleep  3    Status=`netstat  -anputl| grep 5555 | grep LISTEN | awk -F" " '{ print $7 }' | awk -F"/" '{ print $1 }' |wc -l`    if [ $Status -eq 0 ] ;then        echo -e "\e[31m flower is stop !!! \e[0m"    else         echo -e "\e[32m flower is running... \e[0m"    fi}flower_start(){    echo  -e "\e[36m Starting airflow flower ... \e[0m"    sleep 1    nohup /usr/local/python3/bin/airflow  flower >> /root/airflow/service/flower.log 2>&1 &    flower_status}flower_stop(){    echo  -e "\e[36m Stopping airflow flower ... \e[0m"    sleep 1    /usr/bin/kill -9 `netstat  -anputl| grep 5555 | grep LISTEN | awk -F" " '{ print $7 }' | awk -F"/" '{ print $1 }'`    rm -rf /root/airflow/airflow-flower.pid    flower_status}#=== This is the function about airflow worker service ===worker_status(){    echo -e "\e[36m  Checking service status, please wait ... \e[0m"    sleep  3    Status=`ps -elf| grep "airflow serve_logs" | grep -v grep | wc -l`    celeryStatus=`ps -elf| grep celery |grep -v grep | wc -l`    if [ $Status -eq 0 ] ;then        if [ $celeryStatus -eq 0 ]; then            echo -e "\e[31m worker is stop !!! \e[0m"        else           echo -e "\e[32m worker is running... \e[0m"        fi    else         echo -e "\e[32m worker is running... \e[0m"    fi}worker_start(){    echo  -e "\e[36m Starting airflow worker ... \e[0m"    sleep 1    nohup /usr/local/python3/bin/airflow  worker >> /root/airflow/service/worker.log 2>&1 &    worker_status}worker_stop(){    echo  -e "\e[36m Stopping airflow worker ... \e[0m"    sleep 1    /usr/bin/kill -9 `ps -elf| grep "airflow serve_logs" | grep -v grep |awk -F" " '{ print $4 }'`    /usr/bin/kill -9 `ps -elf| grep celery |grep -v grep |awk -F" " '{ print $4 }'`    rm -rf /root/airflow/airflow-worker.pid    worker_status}#=== This is the startup option for the airflow service ===case "$2" in  start)    case "$1" in      webserver)        webserver_start        ;;      worker)        worker_start        ;;      scheduler)        scheduler_start        ;;      flower)        flower_start        ;;      all)        webserver_start        scheduler_start        flower_start        worker_start        ;;      *)        echo -e "\n A tool used for starting airflow servicesUsage: airflow.sh {webserver|worker|scheduler|flower|all}  {start|stop|status}"        usage        exit 2      esac    ;;  stop)    case "$1" in      webserver)        webserver_stop        ;;      worker)        worker_stop        ;;      scheduler)        scheduler_stop        ;;      flower)        flower_stop        ;;      all)        worker_stop        flower_stop        scheduler_stop        webserver_stop        ;;      *)        echo -e "\n A tool used for starting airflow servicesUsage: airflow.sh {webserver|worker|scheduler|flower|all}  {start|stop|status}"        usage        exit 3      esac    ;;  status)    case "$1" in      webserver)        webserver_status        ;;      worker)        worker_status        ;;      scheduler)        scheduler_status        ;;      flower)        flower_status        ;;      all)        webserver_status        scheduler_status        flower_status        worker_status        ;;      *)        echo -e "\n A tool used for starting airflow servicesUsage: airflow.sh {webserver|worker|scheduler|flower|all}  {start|stop|status}"        usage        exit 4      esac    ;;  *)    echo -e "\n A tool used for starting airflow servicesUsage: airflow.sh {webserver|worker|scheduler|flower|all}  {start|stop|status}"    usage    exit 1esac

获取日志信息的改造

  1. 进入incubator-airflow/airflow/www/
  2. 修改views.py
    在 class Airflow(BaseView)中添加下面代码
    @expose('/logs')@login_required@wwwutils.action_loggingdef logs(self):BASE_LOG_FOLDER = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))dag_id = request.args.get('dag_id')task_id = request.args.get('task_id')execution_date = request.args.get('execution_date')dag = dagbag.get_dag(dag_id)log_relative = "{dag_id}/{task_id}/{execution_date}".format(**locals())loc = os.path.join(BASE_LOG_FOLDER, log_relative)loc = loc.format(**locals())log = ""TI = models.TaskInstancesession = Session()dttm = dateutil.parser.parse(execution_date)ti = session.query(TI).filter(TI.dag_id == dag_id, TI.task_id == task_id,TI.execution_date == dttm).first()dttm = dateutil.parser.parse(execution_date)form = DateTimeForm(data={'execution_date': dttm})
    if ti:        host = ti.hostname        log_loaded = False        if os.path.exists(loc):            try:                f = open(loc)                log += "".join(f.readlines())                f.close()                log_loaded = True            except:                log = "*** Failed to load local log file: {0}.\n".format(loc)        else:            WORKER_LOG_SERVER_PORT = \                conf.get('celery', 'WORKER_LOG_SERVER_PORT')            url = os.path.join(                "http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative            ).format(**locals())            log += "*** Log file isn't local.\n"            log += "*** Fetching here: {url}\n".format(**locals())            try:                import requests                timeout = None  # No timeout                try:                    timeout = conf.getint('webserver', 'log_fetch_timeout_sec')                except (AirflowConfigException, ValueError):                    pass                response = requests.get(url, timeout=timeout)                response.raise_for_status()                log += '\n' + response.text                log_loaded = True            except:                log += "*** Failed to fetch log file from worker.\n".format(                    **locals())        if not log_loaded:            # load remote logs            remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')            remote_log = os.path.join(remote_log_base, log_relative)            log += '\n*** Reading remote logs...\n'            # S3            if remote_log.startswith('s3:/'):                log += log_utils.S3Log().read(remote_log, return_error=True)            # GCS            elif remote_log.startswith('gs:/'):                log += log_utils.GCSLog().read(remote_log, return_error=True)            # unsupported            elif remote_log:                log += '*** Unsupported remote log location.'        session.commit()        session.close()    if PY2 and not isinstance(log, unicode):        log = log.decode('utf-8')    title = "Log"    return wwwutils.json_response(log)
>  3. 重启服务,访问url如:

http://localhost:8085/admin/airflow/logs?task_id=run_after_loop&dag_id=example_bash_operator&execution_date=2018-01-11

>  就可以拿到这个任务在execution_date=2018-01-11的日志### 删除DAG>  由于dag的删除现在官方没有暴露直接的api,而完整的删除又牵扯到多个表,总结出删除dag的sql如下

set @dag_id = 'BAD_DAG';
delete from airflow.xcom where dag_id = @dag_id;
delete from airflow.task_instance where dag_id = @dag_id;
delete from airflow.sla_miss where dag_id = @dag_id;
delete from airflow.log where dag_id = @dag_id;
delete from airflow.job where dag_id = @dag_id;
delete from airflow.dag_run where dag_id = @dag_id;
delete from airflow.dag where dag_id = @dag_id;

### 集群管理脚本#### 集群服务上线脚本```bash#!/usr/bin/env bashfunction usage() {    echo -e "\n A tool used for starting airflow servicesUsage: 200.sh {webserver|worker|scheduler|flower}"}PORT=8081ROLE=webserverENV_ARGS=""check_alive() {    PID=`netstat -nlpt | grep $PORT | awk '{print $7}' | awk -F "/" '{print $1}'`    [ -n "$PID" ] && return 0 || return 1}check_scheduler_alive() {    PIDS=`ps -ef | grep "/usr/local/bin/airflow scheduler" | grep "python" | awk '{print $2}'`    [ -n "$PIDS" ] && return 0 || return 1}function get_host_ip(){    local host=$(ifconfig | grep "inet " | grep "\-\->" | awk '{print $2}' | tail -1)    if [[ -z "$host" ]]; then        host=$(ifconfig | grep "inet " | grep "broadcast" | awk '{print $2}' | tail -1)    fi    echo "${host}"}start_service() {    if [ $ROLE = 'scheduler' ];then        check_scheduler_alive    else        check_alive    fi    if [ $? -ne 0 ];then        nohup airflow $ROLE $ENV_ARGS > $BASE_LOG_DIR/$ROLE/$ROLE.log 2>&1 &        sleep 5        if [ $ROLE = 'scheduler' ];then            check_scheduler_alive        else            check_alive        fi        if [ $? -ne 0 ];then            echo "service start error"            exit 1        else            echo "service start success"            exit 0        fi    else        echo "service alreay started"        exit 0    fi}function main() {    if [ -z "${POOL}" ]; then        echo "the environment variable POOL cannot be empty"        exit 1    fi    source /data0/hcp/sbin/init-hcp.sh    case "$1" in        webserver)            echo "starting airflow webserver"            ROLE=webserver            PORT=8081            start_service            ;;        worker)            echo "starting airflow worker"            ROLE=worker            PORT=8793            local host_ip=$(get_host_ip)            ENV_ARGS="-cn ${host_ip}@${host_ip}"            start_service            ;;        flower)            echo "starting airflow flower"            ROLE=flower            PORT=5555            start_service            ;;        scheduler)            echo "starting airflow scheduler"            ROLE=scheduler            start_service            ;;             *)            usage            exit 1    esac}main "$@"
集群服务下线脚本
#!/usr/bin/env bashfunction usage() {    echo -e "\n A tool used for stop airflow servicesUsage: 200.sh {webserver|worker|scheduler|flower}"}function get_host_ip(){    local host=$(ifconfig | grep "inet " | grep "\-\->" | awk '{print $2}' | tail -1)    if [[ -z "$host" ]]; then        host=$(ifconfig | grep "inet " | grep "broadcast" | awk '{print $2}' | tail -1)    fi    echo "${host}"}function main() {    if [ -z "${POOL}" ]; then        echo "the environment variable POOL cannot be empty"        exit 1    fi    source /data0/hcp/sbin/init-hcp.sh    case "$1" in        webserver)            echo "stopping airflow webserver"            cat $AIRFLOW_HOME/airflow-webserver.pid | xargs kill -9            ;;        worker)            echo "stopping airflow worker"            PORT=8793            PID=`netstat -nlpt | grep $PORT | awk '{print $7}' | awk -F "/" '{print $1}'`            kill -9 $PID            local host_ip=$(get_host_ip)            ps -ef | grep celeryd | grep ${host_ip}@${host_ip} | awk '{print $2}' | xargs kill -9            ;;        flower)            echo "stopping airflow flower"            PORT=5555            PID=`netstat -nlpt | grep $PORT | awk '{print $7}' | awk -F "/" '{print $1}'`            kill -9 $PID            start_service            ;;        scheduler)            echo "stopping airflow scheduler"            PID=`ps -ef | grep "/usr/local/bin/airflow scheduler" | grep "python" | awk '{print $2}'`            kill -9 $PID            ;;             *)            usage            exit 1    esac}main "$@"

修改ariflow 时区问题

airflow默认使用utc时间,在中国时区需要用+8小时就是本地时间,下面把airflow全面修改为中国时区,带大家改airflow源码,这里主要针对airflow版本是1.10.0 进行修改,其它版本大同小异,参照修改即可

  1. 在airflow家目录下修改airflow.cfg,设置
    default_timezone = Asia/Shanghai

  2. 进入airflow包的安装位置,也就是site-packages的位置,以下修改文件均为相对位置
    这我安装airflow包的位置(大家自行参考)
    cd /usr/local/python3/lib/python3.6/site-packages/airflow

  3. 修改 utils/timezone.py

    #在 utc = pendulum.timezone('UTC') 这行(第27行)代码下添加,from airflow import configuration as conftry:tz = conf.get("core", "default_timezone")if tz == "system":utc = pendulum.local_timezone()else:utc = pendulum.timezone(tz)except Exception:pass#修改utcnow()函数 (在第69行)原代码 d = dt.datetime.utcnow() 修改为 d = dt.datetime.now()
  4. 修改 utils/sqlalchemy.py
    #在utc = pendulum.timezone('UTC') 这行(第37行)代码下添加from airflow import configuration as conftry:tz = conf.get("core", "default_timezone")if tz == "system":utc = pendulum.local_timezone()else:utc = pendulum.timezone(tz)except Exception:pass

注释 utils/sqlalchemy.py中的cursor.execute("SET time_zone = '+00:00'") (第124行)

>  5. 修改 www/templates/admin/master.html(第31行)```python把代码 var UTCseconds = (x.getTime() + x.getTimezoneOffset()*60*1000); 改为 var UTCseconds = x.getTime();把代码 "timeFormat":"H:i:s %UTC%",改为  "timeFormat":"H:i:s",
  1. 最后重启airflow-webserver即可
0