Import 1, 7), catchup=False) def sync_dag(): def sync(): # Sync DAG (let's assume we have 2 like this that are pretty similar) from corators import task, dag Params = context # Access to context paramsĬreate_backup_env() > other_task() Trigger_sync_dag_2_task = def other_task(): Trigger_sync_dag_1_task = TriggerDagRunOperator( To use the TriggerDagRunOperator, we need to define something like this: # Wrapper DAG from corators import task, dagįrom _dagrun import TriggerDagRunOperatorįrom import get_current_contextįrom datetime import 1, 7), catchup=False) def wrapper_dag(): def create_backup_env(): The SolutionįYI - I simplified the solution a lot but always kept the main components untouched. Also, these DAGs cannot be executed manually or with a scheduled interval anymore but the Wrapper DAG instead, the create-backup-env task has to always be run first for the 2 DAGs to always push data to the same env and don't push to old envs that will not be used anymore.įurthermore, the 2 DAGs can receive quite many config parameters to execute or not certain tasks using the Trigger DAG w/config feature that Airflow provides, so these parameters have to be also available in the Wrapper DAG. The proposed solution was to create a new DAG (which I'll call Wrapper from now on) that first runs this create-backup-env task and then triggers the 2 DAGs using the TriggerDagRunOperator. With this, the 2 DAGs cannot run async anymore, they have to sync the data to the same environment. If anything goes wrong, we can just switch the environment and delete the broken one. The sync process between the 2 data sources is not free of failures so, a new need come up, which was to first create a backup of the env and then sync the data to a new env that is a copy of the old one. Until now, both DAGs were run individually, updating the CMS environment async. Each DAG syncs a specific type of data to the same env. I had 2 DAGs that run at the same time (with the same schedule_interval) and synced data from the ERP to the CMS. I had 2 data sources, an ERP and one content environment (from now on I'll call it 'env') from a CMS (if you don't know what a CMS is, I explain a little bit about it in this post). If you want to go straight to the solution you can skip this section. Maybe I was just not experienced enough and I fell into a really easy thing to fix but, today I'll show how to do it, so you don't have to struggle as I did □ let's get into it. So I was in this situation, struggling for like 5 hours yesterday (yes, the last 5 Friday work hours, the best ones to get stuck with some code) trying to pass parameters using the TriggerDagRunOperator, and wanting to die but at the end achieving it.
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |