I’ve been working with Airflow recently, and I needed to configure an operator by passing it the execution time. Or at least I thought so. I really felt like writing findings down, because if you’re in the dark here, like I was today, there are some fundamental Airflow principles you still need to wrap your head around.
Let’s simulate what cause the error first.
- My DAG contained the task my_hive_task, which uses the HiveOperator.
- The query (passed as a relative path to an sql file to the hql parameter) contained a reference to the end_date variable.
- I was trying to fill it with the predefined ds Airflow variable, which contains the execution date of the DAG run.
with DAG(...) as dag: ... my_hive_task = HiveOperator( hql = '/my_hiveql_query.sql', params = { **args, end_date = '{{ ds }}' }, task_id = '...', ) ...
The query in my_hiveql_query.sql contains the following query. It should be clear that I’m trying to filter a SELECT query based on the end_date param from my_hive_task.
SELECT * FROM my_schema.my_table WHERE timestamp < '{{ params.end_date }}'
However, that simply generated the following error in the Airflow GUI:
name 'ds' is not defined
If you want to understand what the issue at hand is here, it’s crucial that you understand how Airflow templates work.
- An Airflow template is a piece of code, a command, that has dynamic components that can be injected.
- This templating process is done by Jinja.
- Every time your DAG runs, Jinja turns a template into a rendered command, query, code, etc., depending on the operator.
Conclusion: You can’t simply evaluate variables and expressions wherever you want in your DAG file. Templates can only be used in operator parameters that support them.
For example, you can template the bash_command parameter of the Bash Operator.
with DAG(...) as dag: ... BashOperator( task_id = "echo_date", bash_command = "echo {{ ds }}" )
In my case, that meant using the ds variable inside the hql parameter — which is the content of my_hiveql_query.sql.
with DAG(...) as dag: ... my_hive_task = HiveOperator( hql = '/my_hiveql_query.sql', params = { **args }, task_id = '...', ) ...
The query in my_hiveql_query.sql contains the following query. It should be clear that I’m trying to filter a SELECT query based on the end_date param from my_hive_task.
SELECT * FROM my_schema.my_table WHERE timestamp < '{{ ds }}'
🤨 So how do I know if a parameter supports templates?
First of all, you can always look at the documentation of an operator. The Hive operator clearly states “(templated)” at the end of the hql documentation.
Second, you can check the source code of the operator. The source code for the Hive operator is accessible on GitHub. All you need to do is find the template_fields variable, which contains all the parameters that can be templated.
Great success!