提问者:小点点

如何使用AWS GlueOperator触发粘合作业


我的脚本只有一个任务来触发粘合作业。我能够创建DAG。下面是我的DAG代码。

from airflow import DAG
from airflow.operators.email_operator import EmailOperator
from airflow.providers.amazon.aws.operators.glue import AwsGlueJobOperator
from datetime import datetime, timedelta


### glue job specific variables
glue_job_name = "my_glue_job"
glue_iam_role = "AWSGlueServiceRole"
region_name = "us-west-2"
email_recipient = "me@gmail.com"

default_args = {
    'owner': 'me',
    'start_date': datetime(2020, 1, 1),
    'retry_delay': timedelta(minutes=5),
    'email': email_recipient,
    'email_on_failure': True
}


with DAG(dag_id = 'glue_af_pipeline', default_args = default_args, schedule_interval = None) as dag:
    
    glue_job_step = AwsGlueJobOperator(
        job_name =glue_job_name,
        script_location = 's3://my-s3-location',
        region_name = region_name,
        iam_role_name = glue_iam_role,
        script_args=None,
        num_of_dpus=10,
        task_id = 'glue_job_step',
        dag = dag
        )
   
    glue_job_step

当我运行DAG时,它会失败,并给出以下错误:

[2020-10-13 08:27:14315]{logging_mixin.py:112}INFO-[2020-10-13 08:27:14315]{glue.py:114}错误-无法运行aws glue作业,错误:参数验证失败:参数参数参数的类型无效,值:[],类型:

如有任何建议,我们将不胜感激。


共1个答案

匿名用户

如果你正在运行一个现有的GlueWork,试试这个,

glue_job_step = AwsGlueJobOperator(
        task_id = "glue_job_step",
        job_name = glue_job_name,
        job_desc = f"triggering glue job {glue_job_name}",
        region_name = region_name,
        iam_role_name = glue_iam_role,
        num_of_dpus = 1,
        dag = dag
        )

如果没有输入参数,请删除script_args。