Home » An overview of macros and other interesting variables in Kubeflow

An overview of macros and other interesting variables in Kubeflow

  • by
  • 5 min read

Coming from Airflow, I expected macros to be all over the place in Kubeflow. Surprisingly, it took me a lot of time to scramble some pieces of documentation on how to use them. In this blog post, I’ll try to give an overview of how and where you can use variables that relate to the current run.

In Airflow, you build templates and can populate them with macros, which provides some common Airflow functionalities and variables that are related to the run that the template is being executed in, like the execution date.

I expected the same in Kubeflow. To make a long story short: they exist, but there’s no official documentation. In the next section you’ll find an overview of the variables that relate to the run that the code is being executed in.

Execution IDs

Sometimes you’d like to store the ID of the execution in a database, for traceability reasons perhaps. To access the ID of the whole pipeline run, or the ID of the component run, you can use the following two variables:

  • kfp.dsl.RUN_ID_PLACEHOLDER: Returns the ID of the pipeline run
  • kfp.dsl.EXECUTION_ID_PLACEHOLDER: Returns the ID of the component run

🚩Keep in mind

  • You can use both variables when running a pipeline once, or in a scheduled run.
  • You cannot use these variables in the arguments of your (recurring) run. Otherwise, you’ll encounter the following error:

invalid spec: templates.p-test-pipeline.tasks.hello-world failed to resolve {{pod.name}}

task_hello_world_pipeline_args = op_hello_world(pipeline_arg_current_date, 
                                                pipeline_arg_scheduled_time,
                                                kfp.dsl.RUN_ID_PLACEHOLDER, # pipeline run id
                                                kfp.dsl.EXECUTION_ID_PLACEHOLDER, # component run id
                                                pipeline_arg_index)

Schedule time, run time and run count

More importantly are the schedule time and the run time. You’ll probably need them for smooth backfilling or for ensuring idempotency when your DAGS are executed. These can be accessed using the following macros.

  • [[CurrentTime]]: Returns the current time.
  • [[ScheduledTime]]: Returns the time that this run was scheduled on.
  • [[Index]]: Returns the number of runs that have already passed before this run.

Both CurrentTime and ScheduledTime can be formatted, by providing a timestamp template. To simply get the date, you can use: ‘[[ScheduledTime.2006-01-02]]’

🚩 Keep in mind

  • You cannot use these macros for configuring a single run. They can only be used when scheduling a recurring run.
  • You cannot use these macros inside a pipeline or a component, only in the arguments of a recurring run.
  • These macros should be formatted as a string; wrapped in single or double quotes.
  • You need to use the magical reference date as the timestamp template, you can’t just use whatever you want. Otherwise you’ll get very weird dates.
kfp.Client().create_recurring_run(
	<...>
    params = {
        'pipeline_arg_current_date': '[[CurrentTime.2012-01-02]]', # macro
        'pipeline_arg_scheduled_time': '[[ScheduledTime]]', # macro
        'pipeline_arg_index': '[[Index]]' # macro
    })

Workflow variables

Finally, you can also reference Argo‘s workflow variables. Argo is Kubeflow’s workflow executor and it has built-in variables that it automatically substitutes by their value. Some of these variables get really technical, like the name of the pod, the ID of the workflow, etc.

  • {{pod.name}}: Returns the name of the pod that the component is executed in.
  • {{workflow.uid}}: Returns the ID the the workflow.
  • {{workflow.creationTimeStamp}}: Returns the timestamp when the workflow was created.

🚩 Keep in mind

  • You can use these in a component, in a pipeline and in the arguments of a pipeline configuration.
  • You should wrap these variables in single or double quotes to prevent errors.
  • All workflow variables can be found here.
def hello_world(current_date:str, scheduled_time:str, run_id:str, execution_id:str, index:str) -> bool:
    print('The workflow started at {{workflow.creationTimestamp}}.') # Workflow variable
    return True

op_hello_world = create_component_from_func(hello_world)

Code

Below, you can find some chunks of code that you can use to try everything out.

First, the component.

def hello_world(current_date:str, scheduled_time:str, run_id:str, execution_id:str, index:str) -> bool:
  
    print(f'The current date is {current_date}.')
    print(f'The scheduled time is {scheduled_time}.')
    print(f'The run id is {run_id}.')
    print(f'The execution id is {execution_id}.')
    print(f'The index of the run is {index}.')
    print('The workflow started at {{workflow.creationTimestamp}}.') # Workflow variable

    return True

op_hello_world = create_component_from_func(hello_world)

Next, the pipeline that uses the hello_world component.

import kfp.dsl as dsl
@dsl.pipeline(name = pipeline_name, description = pipeline_description)
def hello_pipeline(pipeline_arg_current_date:str, 
                   pipeline_arg_scheduled_time:str, 
                   pipeline_arg_index:str):
    
    task_hello_world_pipeline_args = op_hello_world(pipeline_arg_current_date, 
                                                    pipeline_arg_scheduled_time,
                                                    kfp.dsl.RUN_ID_PLACEHOLDER, # pipeline run id
                                                    kfp.dsl.EXECUTION_ID_PLACEHOLDER, # component run id
                                                    pipeline_arg_index)
    
    kfp.dsl.get_pipeline_conf().set_ttl_seconds_after_finished(900)

Next, you can schedule the recurring run using the following code.

kf_exp = client.create_experiment(name = experiment_name)

pipeline_path = '<YOUR_PATH>'
kfp.compiler.Compiler().compile(hello_pipeline, pipeline_path, type_check = True)
kf_pipe = kfp.Client().upload_pipeline(pipeline_package_path = pipeline_path, pipeline_name = pipeline_name)

kfp.Client().create_recurring_run(
    pipeline_id = kf_pipe.id,
    experiment_id = kf_exp.id,
    job_name = job_name,
    cron_expression = '0 45 10 * * *',
    start_time = ('2022-01-25T12:30:00Z'),
    max_concurrency = 1,
    no_catchup = False,
    enabled = True,
    params = {
        'pipeline_arg_current_date': '[[CurrentTime.2012-01-02]]', # macro
        'pipeline_arg_scheduled_time': '[[ScheduledTime]]', # macro
        'pipeline_arg_index': '[[Index]]' # macro
    })

If all of this runs properly, this will be the output.

The current date is 2022-01-26.
The scheduled time is 20220126104500.
The run id is 4318231d-124b-41f7-89f2-c5ab58ecafaf.
The execution id is 4315231d-129b-21f7-89f2-c5bb58ecafaf-p-test-pipeline-joblwdwm-1-3474223390-3542905297.
The index of the run is 1.
The workflow started at 2022-01-26 20:10:33 +0000 UTC.

Conclusion

Variables that relate to their current run are excellent for making your data pipelines idempotent. It’s somewhat weird that there’s no proper documentation on how to use them in Kubeflow. With this blog post I hope I’ve provided you with an overview of the possibilities.

Say thanks, ask questions or give feedback

Technologies get updated, syntax changes and honestly… I make mistakes too. If something is incorrect, incomplete or doesn’t work, let me know in the comments below and help thousands of visitors.

Leave a Reply

Your email address will not be published.