airflow.models.dag

Module Contents

airflow.models.dag.log[source]
airflow.models.dag.ScheduleInterval[source]
airflow.models.dag.get_last_dagrun(dag_id, session, include_externally_triggered=False)[source]
Returns the last dag run for a dag, None if there was none.
Last dag run can be any type of run eg. scheduled or backfilled.
Overridden DagRuns are ignored.
class airflow.models.dag.DAG(dag_id, description=None, schedule_interval=timedelta(days=1), start_date=None, end_date=None, full_filepath=None, template_searchpath=None, template_undefined=jinja2.Undefined, user_defined_macros=None, user_defined_filters=None, default_args=None, concurrency=conf.getint('core', 'dag_concurrency'), max_active_runs=conf.getint('core', 'max_active_runs_per_dag'), dagrun_timeout=None, sla_miss_callback=None, default_view=None, orientation=conf.get('webserver', 'dag_orientation'), catchup=conf.getboolean('scheduler', 'catchup_by_default'), on_success_callback=None, on_failure_callback=None, doc_md=None, params=None, access_control=None, is_paused_upon_creation=None, jinja_environment_kwargs=None, tags=None)[source]

Bases: airflow.dag.base_dag.BaseDag, airflow.utils.log.logging_mixin.LoggingMixin

A dag (directed acyclic graph) is a collection of tasks with directional dependencies. A dag also has a schedule, a start date and an end date (optional). For each schedule, (say daily or hourly), the DAG needs to run each individual tasks as their dependencies are met. Certain tasks have the property of depending on their own past, meaning that they can’t run until their previous schedule (and upstream tasks) are completed.

DAGs essentially act as namespaces for tasks. A task_id can only be added once to a DAG.

Parameters
  • dag_id (str) – The id of the DAG

  • description (str) – The description for the DAG to e.g. be shown on the webserver

  • schedule_interval (datetime.timedelta or dateutil.relativedelta.relativedelta or str that acts as a cron expression) – Defines how often that DAG runs, this timedelta object gets added to your latest task instance’s execution_date to figure out the next schedule

  • start_date (datetime.datetime) – The timestamp from which the scheduler will attempt to backfill

  • end_date (datetime.datetime) – A date beyond which your DAG won’t run, leave to None for open ended scheduling

  • template_searchpath (str or list[str]) – This list of folders (non relative) defines where jinja will look for your templates. Order matters. Note that jinja/airflow includes the path of your DAG file by default

  • template_undefined (jinja2.Undefined) – Template undefined type.

  • user_defined_macros (dict) – a dictionary of macros that will be exposed in your jinja templates. For example, passing dict(foo='bar') to this argument allows you to {{ foo }} in all jinja templates related to this DAG. Note that you can pass any type of object here.

  • user_defined_filters (dict) – a dictionary of filters that will be exposed in your jinja templates. For example, passing dict(hello=lambda name: 'Hello %s' % name) to this argument allows you to {{ 'world' | hello }} in all jinja templates related to this DAG.

  • default_args (dict) – A dictionary of default parameters to be used as constructor keyword parameters when initialising operators. Note that operators have the same hook, and precede those defined here, meaning that if your dict contains ‘depends_on_past’: True here and ‘depends_on_past’: False in the operator’s call default_args, the actual value will be False.

  • params (dict) – a dictionary of DAG level parameters that are made accessible in templates, namespaced under params. These params can be overridden at the task level.

  • concurrency (int) – the number of task instances allowed to run concurrently

  • max_active_runs (int) – maximum number of active DAG runs, beyond this number of DAG runs in a running state, the scheduler won’t create new active DAG runs

  • dagrun_timeout (datetime.timedelta) – specify how long a DagRun should be up before timing out / failing, so that new DagRuns can be created. The timeout is only enforced for scheduled DagRuns, and only once the # of active DagRuns == max_active_runs.

  • sla_miss_callback (types.FunctionType) – specify a function to call when reporting SLA timeouts.

  • default_view (str) – Specify DAG default view (tree, graph, duration, gantt, landing_times)

  • orientation (str) – Specify DAG orientation in graph view (LR, TB, RL, BT)

  • catchup (bool) – Perform scheduler catchup (or only run latest)? Defaults to True

  • on_failure_callback (callable) – A function to be called when a DagRun of this dag fails. A context dictionary is passed as a single parameter to this function.

  • on_success_callback (callable) – Much like the on_failure_callback except that it is executed when the dag succeeds.

  • access_control (dict) – Specify optional DAG-level permissions, e.g., “{‘role1’: {‘can_dag_read’}, ‘role2’: {‘can_dag_read’, ‘can_dag_edit’}}”

  • is_paused_upon_creation (bool or None) – Specifies if the dag is paused when created for the first time. If the dag exists already, this flag will be ignored. If this optional parameter is not specified, the global config setting will be used.

  • jinja_environment_kwargs (dict) –

    additional configuration options to be passed to Jinja Environment for template rendering

    Example: to avoid Jinja from removing a trailing newline from template strings

    DAG(dag_id='my-dag',
        jinja_environment_kwargs={
            'keep_trailing_newline': True,
            # some other jinja2 Environment options here
        }
    )
    

    See: Jinja Environment documentation

  • tags (List[str]) – List of tags to help filtering DAGS in the UI.

_comps[source]
__serialized_fields :Optional[FrozenSet[str]][source]
dag_id[source]
full_filepath[source]
concurrency[source]
access_control[source]
description[source]
description_unicode[source]
pickle_id[source]
tasks[source]
task_ids[source]
filepath[source]

File location of where the dag object is instantiated

folder[source]

Folder location of where the DAG object is instantiated.

owner[source]

Return list of all owners found in DAG tasks.

Returns

Comma separated list of owners in DAG tasks

Return type

str

allow_future_exec_dates[source]
concurrency_reached[source]

Returns a boolean indicating whether the concurrency limit for this DAG has been reached

is_paused[source]

Returns a boolean indicating whether this DAG is paused

normalized_schedule_interval[source]

Returns Normalized Schedule Interval. This is used internally by the Scheduler to schedule DAGs.

  1. Converts Cron Preset to a Cron Expression (e.g @monthly to 0 0 1 * *)

  2. If Schedule Interval is “@once” return “None”

  3. If not (1) or (2) returns schedule_interval

latest_execution_date[source]

Returns the latest date for which at least one dag run exists

subdags[source]

Returns a list of the subdag objects associated to this DAG

roots[source]

Return nodes with no parents. These are first to execute and are called roots or root nodes.

leaves[source]

Return nodes with no children. These are last to execute and are called leaves or leaf nodes.

__repr__(self)[source]
__eq__(self, other)[source]
__ne__(self, other)[source]
__lt__(self, other)[source]
__hash__(self)[source]
__enter__(self)[source]
__exit__(self, _type, _value, _tb)[source]
get_default_view(self)[source]

This is only there for backward compatible jinja2 templates

date_range(self, start_date, num=None, end_date=timezone.utcnow())[source]
is_fixed_time_schedule(self)[source]

Figures out if the DAG schedule has a fixed time (e.g. 3 AM).

Returns

True if the schedule has a fixed time, False if not.

following_schedule(self, dttm)[source]

Calculates the following schedule for this dag in UTC.

Parameters

dttm – utc datetime

Returns

utc datetime

previous_schedule(self, dttm)[source]

Calculates the previous schedule for this dag in UTC

Parameters

dttm – utc datetime

Returns

utc datetime

get_run_dates(self, start_date, end_date=None)[source]

Returns a list of dates between the interval received as parameter using this dag’s schedule interval. Returned dates can be used for execution dates.

Parameters
  • start_date (datetime) – the start date of the interval

  • end_date (datetime) – the end date of the interval, defaults to timezone.utcnow()

Returns

a list of dates within the interval following the dag’s schedule

Return type

list

normalize_schedule(self, dttm)[source]

Returns dttm + interval unless dttm is first interval then it returns dttm

get_last_dagrun(self, session=None, include_externally_triggered=False)[source]
_get_concurrency_reached(self, session=None)[source]
_get_is_paused(self, session=None)[source]
handle_callback(self, dagrun, success=True, reason=None, session=None)[source]

Triggers the appropriate callback depending on the value of success, namely the on_failure_callback or on_success_callback. This method gets the context of a single TaskInstance part of this DagRun and passes that to the callable along with a ‘reason’, primarily to differentiate DagRun failures.

Parameters
  • dagrun – DagRun object

  • success – Flag to specify if failure or success callback should be called

  • reason – Completion reason

  • session – Database session

get_active_runs(self)[source]

Returns a list of dag run execution dates currently running

Returns

List of execution dates

get_num_active_runs(self, external_trigger=None, session=None)[source]

Returns the number of active “running” dag runs

Parameters
  • external_trigger (bool) – True for externally triggered active dag runs

  • session

Returns

number greater than 0 for active dag runs

get_dagrun(self, execution_date, session=None)[source]

Returns the dag run for a given execution date if it exists, otherwise none.

Parameters
  • execution_date – The execution date of the DagRun to find.

  • session

Returns

The DagRun if found, otherwise None.

get_dagruns_between(self, start_date, end_date, session=None)[source]

Returns the list of dag runs between start_date (inclusive) and end_date (inclusive).

Parameters
  • start_date – The starting execution date of the DagRun to find.

  • end_date – The ending execution date of the DagRun to find.

  • session

Returns

The list of DagRuns found.

_get_latest_execution_date(self, session=None)[source]
resolve_template_files(self)[source]
get_template_env(self)[source]

Build a Jinja2 environment.

set_dependency(self, upstream_task_id, downstream_task_id)[source]

Simple utility method to set dependency between two tasks that already have been added to the DAG using add_task()

get_task_instances(self, start_date=None, end_date=None, state=None, session=None)[source]
topological_sort(self)[source]

Sorts tasks in topographical order, such that a task comes after any of its upstream dependencies.

Heavily inspired by: http://blog.jupo.org/2012/04/06/topological-sorting-acyclic-directed-graphs/

Returns

list of tasks in topological order

set_dag_runs_state(self, state=State.RUNNING, session=None, start_date=None, end_date=None)[source]
clear(self, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=True, reset_dag_runs=True, dry_run=False, session=None, get_tis=False, recursion_depth=0, max_recursion_depth=None, dag_bag=None)[source]

Clears a set of task instances associated with the current dag for a specified date range.

Parameters
  • start_date (datetime.datetime or None) – The minimum execution_date to clear

  • end_date (datetime.datetime or None) – The maximum exeuction_date to clear

  • only_failed (bool) – Only clear failed tasks

  • only_running (bool) – Only clear running tasks.

  • confirm_prompt (bool) – Ask for confirmation

  • include_subdags (bool) – Clear tasks in subdags and clear external tasks indicated by ExternalTaskMarker

  • include_parentdag (bool) – Clear tasks in the parent dag of the subdag.

  • reset_dag_runs (bool) – Set state of dag to RUNNING

  • dry_run (bool) – Find the tasks to clear but don’t clear them.

  • session (sqlalchemy.orm.session.Session) – The sqlalchemy session to use

  • get_tis (bool) – Return the sqlachemy query for finding the TaskInstance without clearing the tasks

  • recursion_depth (int) – The recursion depth of nested calls to DAG.clear().

  • max_recursion_depth (int) – The maximum recusion depth allowed. This is determined by the first encountered ExternalTaskMarker. Default is None indicating no ExternalTaskMarker has been encountered.

  • dag_bag (airflow.models.dagbag.DagBag) – The DagBag used to find the dags

classmethod clear_dags(cls, dags, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=False, reset_dag_runs=True, dry_run=False)[source]
__deepcopy__(self, memo)[source]
sub_dag(self, task_regex, include_downstream=False, include_upstream=True)[source]

Returns a subset of the current dag as a deep copy of the current dag based on a regex that should match one or many tasks, and includes upstream and downstream neighbours based on the flag passed.

has_task(self, task_id)[source]
get_task(self, task_id)[source]
pickle_info(self)[source]
pickle(self, session=None)[source]
tree_view(self)[source]

Print an ASCII tree representation of the DAG.

add_task(self, task)[source]

Add a task to the DAG

Parameters

task (task) – the task you want to add

add_tasks(self, tasks)[source]

Add a list of tasks to the DAG

Parameters

tasks (list of tasks) – a lit of tasks you want to add

run(self, start_date=None, end_date=None, mark_success=False, local=False, executor=None, donot_pickle=conf.getboolean('core', 'donot_pickle'), ignore_task_deps=False, ignore_first_depends_on_past=False, pool=None, delay_on_limit_secs=1.0, verbose=False, conf=None, rerun_failed_tasks=False, run_backwards=False)[source]

Runs the DAG.

Parameters
  • start_date (datetime.datetime) – the start date of the range to run

  • end_date (datetime.datetime) – the end date of the range to run

  • mark_success (bool) – True to mark jobs as succeeded without running them

  • local (bool) – True to run the tasks using the LocalExecutor

  • executor (airflow.executor.BaseExecutor) – The executor instance to run the tasks

  • donot_pickle (bool) – True to avoid pickling DAG object and send to workers

  • ignore_task_deps (bool) – True to skip upstream tasks

  • ignore_first_depends_on_past (bool) – True to ignore depends_on_past dependencies for the first set of tasks only

  • pool (str) – Resource pool to use

  • delay_on_limit_secs (float) – Time in seconds to wait before next attempt to run dag run when max_active_runs limit has been reached

  • verbose (bool) – Make logging output more verbose

  • conf (dict) – user defined dictionary passed from CLI

  • rerun_failed_tasks

  • run_backwards

Type

bool

Type

bool

cli(self)[source]

Exposes a CLI specific to this DAG

create_dagrun(self, run_id, state, execution_date=None, start_date=None, external_trigger=False, conf=None, session=None)[source]

Creates a dag run from this dag including the tasks associated with this dag. Returns the dag run.

Parameters
  • run_id (str) – defines the the run id for this dag run

  • execution_date (datetime.datetime) – the execution date of this dag run

  • state (airflow.utils.state.State) – the state of the dag run

  • start_date (datetime) – the date this dag run should be evaluated

  • external_trigger (bool) – whether this dag run is externally triggered

  • conf (dict) – Dict containing configuration/parameters to pass to the DAG

  • session (sqlalchemy.orm.session.Session) – database session

sync_to_db(self, owner=None, sync_time=None, session=None)[source]

Save attributes about this DAG to the DB. Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a SubDagOperator.

Parameters
  • dag (airflow.models.DAG) – the DAG object to save to the DB

  • sync_time (datetime) – The time that the DAG should be marked as sync’ed

Returns

None

get_dagtags(self, session=None)[source]

Creating a list of DagTags, if one is missing from the DB, will insert.

Returns

The DagTag list.

Return type

list

static deactivate_unknown_dags(active_dag_ids, session=None)[source]

Given a list of known DAGs, deactivate any other DAGs that are marked as active in the ORM

Parameters

active_dag_ids (list[unicode]) – list of DAG IDs that are active

Returns

None

static deactivate_stale_dags(expiration_date, session=None)[source]

Deactivate any DAGs that were last touched by the scheduler before the expiration date. These DAGs were likely deleted.

Parameters

expiration_date (datetime) – set inactive DAGs that were touched before this time

Returns

None

static get_num_task_instances(dag_id, task_ids=None, states=None, session=None)[source]

Returns the number of task instances in the given DAG.

Parameters
  • session – ORM session

  • dag_id (unicode) – ID of the DAG to get the task concurrency of

  • task_ids (list[unicode]) – A list of valid task IDs for the given DAG

  • states (list[state]) – A list of states to filter by if supplied

Returns

The number of running tasks

Return type

int

test_cycle(self)[source]

Check to see if there are any cycles in the DAG. Returns False if no cycle found, otherwise raises exception.

_test_cycle_helper(self, visit_map, task_id)[source]

Checks if a cycle exists from the input task using DFS traversal

classmethod get_serialized_fields(cls)[source]

Stringified DAGs and operators contain exactly these fields.

class airflow.models.dag.DagTag[source]

Bases: airflow.models.base.Base

A tag name per dag, to allow quick filtering in the DAG view.

__tablename__ = dag_tag[source]
name[source]
dag_id[source]
__repr__(self)[source]
class airflow.models.dag.DagModel[source]

Bases: airflow.models.base.Base

__tablename__ = dag[source]

These items are stored in the database for state related information

dag_id[source]
root_dag_id[source]
is_paused_at_creation[source]
is_paused[source]
is_subdag[source]
is_active[source]
last_scheduler_run[source]
last_pickled[source]
last_expired[source]
scheduler_lock[source]
pickle_id[source]
fileloc[source]
owners[source]
description[source]
default_view[source]
schedule_interval[source]
tags[source]
__table_args__[source]
timezone[source]
safe_dag_id[source]
__repr__(self)[source]
static get_dagmodel(dag_id, session=None)[source]
classmethod get_current(cls, dag_id, session=None)[source]
get_default_view(self)[source]
get_last_dagrun(self, session=None, include_externally_triggered=False)[source]
static get_paused_dag_ids(dag_ids, session)[source]

Given a list of dag_ids, get a set of Paused Dag Ids

Parameters
  • dag_ids – List of Dag ids

  • session – ORM Session

Returns

Paused Dag_ids

get_dag(self, store_serialized_dags=False)[source]

Creates a dagbag to load and return a DAG. Calling it from UI should set store_serialized_dags = STORE_SERIALIZED_DAGS. There may be a delay for scheduler to write serialized DAG into database, loads from file in this case. FIXME: remove it when webserver does not access to DAG folder in future.

create_dagrun(self, run_id, state, execution_date, start_date=None, external_trigger=False, conf=None, session=None)[source]

Creates a dag run from this dag including the tasks associated with this dag. Returns the dag run.

Parameters
  • run_id (str) – defines the the run id for this dag run

  • execution_date (datetime.datetime) – the execution date of this dag run

  • state (airflow.utils.state.State) – the state of the dag run

  • start_date (datetime.datetime) – the date this dag run should be evaluated

  • external_trigger (bool) – whether this dag run is externally triggered

  • session (sqlalchemy.orm.session.Session) – database session

set_is_paused(self, is_paused, including_subdags=True, store_serialized_dags=False, session=None)[source]

Pause/Un-pause a DAG.

Parameters
  • is_paused – Is the DAG paused

  • including_subdags – whether to include the DAG’s subdags

  • store_serialized_dags – whether to serialize DAGs & store it in DB

  • session – session

classmethod deactivate_deleted_dags(cls, alive_dag_filelocs, session=None)[source]

Set is_active=False on the DAGs for which the DAG files have been removed. Additionally change is_active=False to True if the DAG file exists.

Parameters
  • alive_dag_filelocs – file paths of alive DAGs

  • session – ORM Session