![]() py file! This is a very unfortunate architectural choice. all dependencies are finished) when it’s running the appropriate. The problem is, the Airflow scheduler only checks for new runnable tasks (ie. py file, it looks at instances of the DAG class in the global namespace, and those are the DAGs it executes. py files is a DAG definition in our case. py files, and then in a “round-robin” manner, executes them: it runs one.py, two.py, three.py, and so on, where each of the. What happens is this:Įvery 30 seconds the Airflow scheduler (as configured) lists out all. To understand this I took the log line and looked it up in the Airflow source code. To understand this I looked through the logs, and found that indeed, the first time in the logs that Airflow notices that the insert can run is several minutes after the multi_wait finishes. Why does Airflow need 5 minutes to figure out that a task’s dependencies are all finished? This is a generic issue with Airflow, not specific to waits. Looking at the scheduler logs, we still have a problem: the multi_wait finishes at time X, but the insert is only launched at time X+5 minutes. ![]() We just generate a HQL with several SHOW PARTITION statements separated by and parse the resulting string to see what’s there and what’s missing. We realized this is inefficient, and we can just have one multi_wait task which checks all 3 partitions at once. ![]() In the example above, we’re waiting on 3 tables, and we generate 3 wait jobs. The Hive engine can handle these metadata queries easily, returning in less than a second. So by running the SHOW PARTITION (the syntax starts the same, but it’s a bit different on Hive) on Hive, we can get rid of 95% of the jobs on the Presto cluster, which were taking a long time to run, even though they’re just checking for the presence of a partition. Since all our DWH jobs run on Presto, our Hive execution engine is just sitting around idle handling metadata queries such as CREATE TABLE ( create tasks in the DAG). In our setup, where we’re running a separate master and worker, and using Celery for running worker tasks, the Airflow scheduler doesn’t respect the limits, similar to this bug report. Unfortunately, this feature in Airflow is buggy/broken. So if we have 32 worker slots, we can set up a wait pool with 24 slots, so no more than 24 waits can be running. With this, tasks can be assigned to pools, and per pool limits can be set on execution. The second thing we tried was to use Airflow’s pool feature. This change was easy to make, because we don't construct our DAGs "by hand" for each table, we have a helper function which does this (which also does the current_ds_wait:: stuff), so we just needed to make this change in one place. This way each DAG only ever has one wait job running. There a bunch of other tasks that we generate (such as tasks for running CREATE TABLE IF NOT EXISTS), but let’s ignore that. So when our framework generates the DAG for this DWH job, it generates an insert task ( PrestoOperator operator), which depends on 3 wait tasks ( DsPartitionSensor operators), one for each table. For example, s3://dwh-bucket/company_metrics//datafile. ![]() We use S3, so in our case it looks something like s3://dwh-bucket///. Physically, these are essentially directories, each one holding the data files for that day’s data. The partitions are called ds at Facebook and logically show up as a column of the table, and you’ll find plenty of references to it if you read the Hive docs (because Hive was written at Facebook). Essentially we store (complete) daily, write-once slices of each table, which are generated by daily jobs. This is a feature available on Hive, and not really practical on eg. The other major design pattern from Facebook is the idea of daily partitioned tables. I described ds partitions in an earlier post: These are the list of tables where we need to wait for today's ds partition to land before we can run the SELECT (otherwise the result would be incomplete). Our framework parses the SQL snippets and extracts tables names after current_ds_wait. FROM curent_ds_wait :: table1 INNER JOIN curent_ds_wait :: table2 ON. ![]()
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |