API Reference

Operators

Operators allow for generation of certain types of tasks that become nodes in the DAG when instantiated. All operators derive from BaseOperator and inherit many attributes and methods that way. Refer to the BaseOperator documentation for more details.

There are 3 main types of operators:

  • Operators that performs an action, or tell another system to perform an action
  • Transfer operators move data from one system to another
  • Sensors are a certain type of operator that will keep running until a certain criterion is met. Examples include a specific file landing in HDFS or S3, a partition appearing in Hive, or a specific time of the day. Sensors are derived from BaseSensorOperator and run a poke method at a specified poke_interval until it returns True.

BaseOperator

All operators are derived from BaseOperator and acquire much functionality through inheritance. Since this is the core of the engine, it’s worth taking the time to understand the parameters of BaseOperator to understand the primitive features that can be leveraged in your DAGs.

class airflow.models.BaseOperator(**kwargs)[source]

Bases: airflow.utils.log.logging_mixin.LoggingMixin

Abstract base class for all operators. Since operators create objects that become nodes in the dag, BaseOperator contains many recursive methods for dag crawling behavior. To derive this class, you are expected to override the constructor as well as the ‘execute’ method.

Operators derived from this class should perform or trigger certain tasks synchronously (wait for completion). Example of operators could be an operator that runs a Pig job (PigOperator), a sensor operator that waits for a partition to land in Hive (HiveSensorOperator), or one that moves data from Hive to MySQL (Hive2MySqlOperator). Instances of these operators (tasks) target specific operations, running specific scripts, functions or data transfers.

This class is abstract and shouldn’t be instantiated. Instantiating a class derived from this one results in the creation of a task object, which ultimately becomes a node in DAG objects. Task dependencies should be set by using the set_upstream and/or set_downstream methods.

Parameters:
  • task_id (str) – a unique, meaningful id for the task
  • owner (str) – the owner of the task, using the unix username is recommended
  • retries (int) – the number of retries that should be performed before failing the task
  • retry_delay (datetime.timedelta) – delay between retries
  • retry_exponential_backoff (bool) – allow progressive longer waits between retries by using exponential backoff algorithm on retry delay (delay will be converted into seconds)
  • max_retry_delay (datetime.timedelta) – maximum delay interval between retries
  • start_date (datetime.datetime) – The start_date for the task, determines the execution_date for the first task instance. The best practice is to have the start_date rounded to your DAG’s schedule_interval. Daily jobs have their start_date some day at 00:00:00, hourly jobs have their start_date at 00:00 of a specific hour. Note that Airflow simply looks at the latest execution_date and adds the schedule_interval to determine the next execution_date. It is also very important to note that different tasks’ dependencies need to line up in time. If task A depends on task B and their start_date are offset in a way that their execution_date don’t line up, A’s dependencies will never be met. If you are looking to delay a task, for example running a daily task at 2AM, look into the TimeSensor and TimeDeltaSensor. We advise against using dynamic start_date and recommend using fixed ones. Read the FAQ entry about start_date for more information.
  • end_date (datetime.datetime) – if specified, the scheduler won’t go beyond this date
  • depends_on_past (bool) – when set to true, task instances will run sequentially while relying on the previous task’s schedule to succeed. The task instance for the start_date is allowed to run.
  • wait_for_downstream (bool) – when set to true, an instance of task X will wait for tasks immediately downstream of the previous instance of task X to finish successfully before it runs. This is useful if the different instances of a task X alter the same asset, and this asset is used by tasks downstream of task X. Note that depends_on_past is forced to True wherever wait_for_downstream is used.
  • queue (str) – which queue to target when running this job. Not all executors implement queue management, the CeleryExecutor does support targeting specific queues.
  • dag (airflow.models.DAG) – a reference to the dag the task is attached to (if any)
  • priority_weight (int) – priority weight of this task against other task. This allows the executor to trigger higher priority tasks before others when things get backed up. Set priority_weight as a higher number for more important tasks.
  • weight_rule (str) – weighting method used for the effective total priority weight of the task. Options are: { downstream | upstream | absolute } default is downstream When set to downstream the effective weight of the task is the aggregate sum of all downstream descendants. As a result, upstream tasks will have higher weight and will be scheduled more aggressively when using positive weight values. This is useful when you have multiple dag run instances and desire to have all upstream tasks to complete for all runs before each dag can continue processing downstream tasks. When set to upstream the effective weight is the aggregate sum of all upstream ancestors. This is the opposite where downtream tasks have higher weight and will be scheduled more aggressively when using positive weight values. This is useful when you have multiple dag run instances and prefer to have each dag complete before starting upstream tasks of other dags. When set to absolute, the effective weight is the exact priority_weight specified without additional weighting. You may want to do this when you know exactly what priority weight each task should have. Additionally, when set to absolute, there is bonus effect of significantly speeding up the task creation process as for very large DAGS. Options can be set as string or using the constants defined in the static class airflow.utils.WeightRule
  • pool (str) – the slot pool this task should run in, slot pools are a way to limit concurrency for certain tasks
  • sla (datetime.timedelta) – time by which the job is expected to succeed. Note that this represents the timedelta after the period is closed. For example if you set an SLA of 1 hour, the scheduler would send an email soon after 1:00AM on the 2016-01-02 if the 2016-01-01 instance has not succeeded yet. The scheduler pays special attention for jobs with an SLA and sends alert emails for sla misses. SLA misses are also recorded in the database for future reference. All tasks that share the same SLA time get bundled in a single email, sent soon after that time. SLA notification are sent once and only once for each task instance.
  • execution_timeout (datetime.timedelta) – max time allowed for the execution of this task instance, if it goes beyond it will raise and fail.
  • on_failure_callback (callable) – a function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function. Context contains references to related objects to the task instance and is documented under the macros section of the API.
  • on_retry_callback (callable) – much like the on_failure_callback except that it is executed when retries occur.
  • on_success_callback (callable) – much like the on_failure_callback except that it is executed when the task succeeds.
  • trigger_rule (str) – defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | one_success | one_failed | none_failed | dummy} default is all_success. Options can be set as string or using the constants defined in the static class airflow.utils.TriggerRule
  • resources (dict) – A map of resource parameter names (the argument names of the Resources constructor) to their values.
  • run_as_user (str) – unix username to impersonate while running the task
  • task_concurrency (int) – When set, a task will be able to limit the concurrent runs across execution_dates
  • executor_config (dict) –

    Additional task-level configuration parameters that are interpreted by a specific executor. Parameters are namespaced by the name of executor.

    Example: to run this task in a specific docker container through the KubernetesExecutor

    MyOperator(...,
        executor_config={
        "KubernetesExecutor":
            {"image": "myCustomDockerImage"}
            }
    )
    
  • do_xcom_push (bool) – if True, an XCom is pushed containing the Operator’s result
clear(**kwargs)[source]

Clears the state of task instances associated with the task, following the parameters specified.

dag

Returns the Operator’s DAG if set, otherwise raises an error

deps

Returns the list of dependencies for the operator. These differ from execution context dependencies in that they are specific to tasks and can be extended/overridden by subclasses.

downstream_list

@property: list of tasks directly downstream

execute(context)[source]

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

get_direct_relative_ids(upstream=False)[source]

Get the direct relative ids to the current task, upstream or downstream.

get_direct_relatives(upstream=False)[source]

Get the direct relatives to the current task, upstream or downstream.

get_flat_relative_ids(upstream=False, found_descendants=None)[source]

Get a flat list of relatives’ ids, either upstream or downstream.

get_flat_relatives(upstream=False)[source]

Get a flat list of relatives, either upstream or downstream.

get_task_instances(session, start_date=None, end_date=None)[source]

Get a set of task instance related to this task for a specific date range.

has_dag()[source]

Returns True if the Operator has been assigned to a DAG.

on_kill()[source]

Override this method to cleanup subprocesses when a task instance gets killed. Any use of the threading, subprocess or multiprocessing module within an operator needs to be cleaned up or it will leave ghost processes behind.

post_execute(context, *args, **kwargs)[source]

This hook is triggered right after self.execute() is called. It is passed the execution context and any results returned by the operator.

pre_execute(context, *args, **kwargs)[source]

This hook is triggered right before self.execute() is called.

prepare_template()[source]

Hook that is triggered after the templated fields get replaced by their content. If you need your operator to alter the content of the file before the template is rendered, it should override this method to do so.

render_template(attr, content, context)[source]

Renders a template either from a file or directly in a field, and returns the rendered result.

render_template_from_field(attr, content, context, jinja_env)[source]

Renders a template from a field. If the field is a string, it will simply render the string and return the result. If it is a collection or nested set of collections, it will traverse the structure and render all elements in it. If the field has another type, it will return it as it is.

run(start_date=None, end_date=None, ignore_first_depends_on_past=False, ignore_ti_state=False, mark_success=False)[source]

Run a set of task instances for a date range.

schedule_interval

The schedule interval of the DAG always wins over individual tasks so that tasks within a DAG always line up. The task still needs a schedule_interval as it may not be attached to a DAG.

set_downstream(task_or_task_list)[source]

Set a task or a task list to be directly downstream from the current task.

set_upstream(task_or_task_list)[source]

Set a task or a task list to be directly upstream from the current task.

upstream_list

@property: list of tasks directly upstream

xcom_pull(context, task_ids=None, dag_id=None, key=u'return_value', include_prior_dates=None)[source]

See TaskInstance.xcom_pull()

xcom_push(context, key, value, execution_date=None)[source]

See TaskInstance.xcom_push()

BaseSensorOperator

All sensors are derived from BaseSensorOperator. All sensors inherit the timeout and poke_interval on top of the BaseOperator attributes.

class airflow.sensors.base_sensor_operator.BaseSensorOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator, airflow.models.skipmixin.SkipMixin

Sensor operators are derived from this class and inherit these attributes.

Sensor operators keep executing at a time interval and succeed when a criteria is met and fail if and when they time out.

Parameters:
  • soft_fail (bool) – Set to true to mark the task as SKIPPED on failure
  • poke_interval (int) – Time in seconds that the job should wait in between each tries
  • timeout (int) – Time, in seconds before the task times out and fails.
  • mode (str) – How the sensor operates. Options are: { poke | reschedule }, default is poke. When set to poke the sensor is taking up a worker slot for its whole execution time and sleeps between pokes. Use this mode if the expected runtime of the sensor is short or if a short poke interval is required. When set to reschedule the sensor task frees the worker slot when the criteria is not yet met and it’s rescheduled at a later time. Use this mode if the expected time until the criteria is met is. The poke interval should be more than one minute to prevent too much load on the scheduler.
deps

Adds one additional dependency for all sensor operators that checks if a sensor task instance can be rescheduled.

poke(context)[source]

Function that the sensors defined while deriving this class should override.

Core Operators

Operators

class airflow.operators.bash_operator.BashOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Execute a Bash script, command or set of commands.

See also

For more information on how to use this operator, take a look at the guide: BashOperator

If BaseOperator.do_xcom_push is True, the last line written to stdout will also be pushed to an XCom when the bash command completes

Parameters:
  • bash_command (str) – The command, set of commands or reference to a bash script (must be ‘.sh’) to be executed. (templated)
  • env (dict) – If env is not None, it must be a mapping that defines the environment variables for the new process; these are used instead of inheriting the current process environment, which is the default behavior. (templated)
  • output_encoding (str) – Output encoding of bash command

On execution of this operator the task will be up for retry when exception is raised. However, if a sub-command exits with non-zero value Airflow will not recognize it as failure unless the whole shell exits with a failure. The easiest way of achieving this is to prefix the command with set -e; Example:

bash_command = "set -e; python3 script.py '{{ next_execution_date }}'"
execute(context)[source]

Execute the bash command in a temporary directory which will be cleaned afterwards

class airflow.operators.python_operator.BranchPythonOperator(**kwargs)[source]

Bases: airflow.operators.python_operator.PythonOperator, airflow.models.skipmixin.SkipMixin

Allows a workflow to “branch” or follow a path following the execution of this task.

It derives the PythonOperator and expects a Python function that returns a single task_id or list of task_ids to follow. The task_id(s) returned should point to a task directly downstream from {self}. All other “branches” or directly downstream tasks are marked with a state of skipped so that these paths can’t move forward. The skipped states are propagated downstream to allow for the DAG state to fill up and the DAG run’s state to be inferred.

Note that using tasks with depends_on_past=True downstream from BranchPythonOperator is logically unsound as skipped status will invariably lead to block tasks that depend on their past successes. skipped states propagates where all directly upstream tasks are skipped.

class airflow.operators.check_operator.CheckOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Performs checks against a db. The CheckOperator expects a sql query that will return a single row. Each value on that first row is evaluated using python bool casting. If any of the values return False the check is failed and errors out.

Note that Python bool casting evals the following as False:

  • False
  • 0
  • Empty string ("")
  • Empty list ([])
  • Empty dictionary or set ({})

Given a query like SELECT COUNT(*) FROM foo, it will fail only if the count == 0. You can craft much more complex query that could, for instance, check that the table has the same number of rows as the source table upstream, or that the count of today’s partition is greater than yesterday’s partition, or that a set of metrics are less than 3 standard deviation for the 7 day average.

This operator can be used as a data quality check in your pipeline, and depending on where you put it in your DAG, you have the choice to stop the critical path, preventing from publishing dubious data, or on the side and receive email alerts without stopping the progress of the DAG.

Note that this is an abstract class and get_db_hook needs to be defined. Whereas a get_db_hook is hook that gets a single record from an external source.

Parameters:sql (str) – the sql to be executed. (templated)
class airflow.operators.docker_operator.DockerOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Execute a command inside a docker container.

A temporary directory is created on the host and mounted into a container to allow storing files that together exceed the default disk size of 10GB in a container. The path to the mounted directory can be accessed via the environment variable AIRFLOW_TMP_DIR.

If a login to a private registry is required prior to pulling the image, a Docker connection needs to be configured in Airflow and the connection ID be provided with the parameter docker_conn_id.

Parameters:
  • image (str) – Docker image from which to create the container. If image tag is omitted, “latest” will be used.
  • api_version (str) – Remote API version. Set to auto to automatically detect the server’s version.
  • auto_remove (bool) – Auto-removal of the container on daemon side when the container’s process exits. The default is False.
  • command (str or list) – Command to be run in the container. (templated)
  • cpus (float) – Number of CPUs to assign to the container. This value gets multiplied with 1024. See https://docs.docker.com/engine/reference/run/#cpu-share-constraint
  • dns (list[str]) – Docker custom DNS servers
  • dns_search (list[str]) – Docker custom DNS search domain
  • docker_url (str) – URL of the host running the docker daemon. Default is unix://var/run/docker.sock
  • environment (dict) – Environment variables to set in the container. (templated)
  • force_pull (bool) – Pull the docker image on every run. Default is False.
  • mem_limit (float or str) – Maximum amount of memory the container can use. Either a float value, which represents the limit in bytes, or a string like 128m or 1g.
  • network_mode (str) – Network mode for the container.
  • tls_ca_cert (str) – Path to a PEM-encoded certificate authority to secure the docker connection.
  • tls_client_cert (str) – Path to the PEM-encoded certificate used to authenticate docker client.
  • tls_client_key (str) – Path to the PEM-encoded key used to authenticate docker client.
  • tls_hostname (str or bool) – Hostname to match against the docker server certificate or False to disable the check.
  • tls_ssl_version (str) – Version of SSL to use when communicating with docker daemon.
  • tmp_dir (str) – Mount point inside the container to a temporary directory created on the host by the operator. The path is also made available via the environment variable AIRFLOW_TMP_DIR inside the container.
  • user (int or str) – Default user inside the docker container.
  • volumes – List of volumes to mount into the container, e.g. ['/host/path:/container/path', '/host/path2:/container/path2:ro'].
  • working_dir (str) – Working directory to set on the container (equivalent to the -w switch the docker client)
  • xcom_all (bool) – Push all the stdout or just the last line. The default is False (last line).
  • docker_conn_id (str) – ID of the Airflow connection to use
  • shm_size (int) – Size of /dev/shm in bytes. The size must be greater than 0. If omitted uses system default.
class airflow.operators.druid_check_operator.DruidCheckOperator(**kwargs)[source]

Bases: airflow.operators.check_operator.CheckOperator

Performs checks against Druid. The DruidCheckOperator expects a sql query that will return a single row. Each value on that first row is evaluated using python bool casting. If any of the values return False the check is failed and errors out.

Note that Python bool casting evals the following as False:

  • False
  • 0
  • Empty string ("")
  • Empty list ([])
  • Empty dictionary or set ({})

Given a query like SELECT COUNT(*) FROM foo, it will fail only if the count == 0. You can craft much more complex query that could, for instance, check that the table has the same number of rows as the source table upstream, or that the count of today’s partition is greater than yesterday’s partition, or that a set of metrics are less than 3 standard deviation for the 7 day average. This operator can be used as a data quality check in your pipeline, and depending on where you put it in your DAG, you have the choice to stop the critical path, preventing from publishing dubious data, or on the side and receive email alterts without stopping the progress of the DAG.

Parameters:
  • sql (str) – the sql to be executed
  • druid_broker_conn_id (str) – reference to the druid broker
get_db_hook()[source]

Return the druid db api hook.

get_first(sql)[source]

Executes the druid sql to druid broker and returns the first resulting row.

Parameters:sql (str) – the sql statement to be executed (str)
class airflow.operators.dummy_operator.DummyOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Operator that does literally nothing. It can be used to group tasks in a DAG.

class airflow.operators.email_operator.EmailOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Sends an email.

Parameters:
  • to (list or string (comma or semicolon delimited)) – list of emails to send the email to. (templated)
  • subject (str) – subject line for the email. (templated)
  • html_content (str) – content of the email, html markup is allowed. (templated)
  • files (list) – file names to attach in email
  • cc (list or string (comma or semicolon delimited)) – list of recipients to be added in CC field
  • bcc (list or string (comma or semicolon delimited)) – list of recipients to be added in BCC field
  • mime_subtype (str) – MIME sub content type
  • mime_charset (str) – character set parameter added to the Content-Type header.
class airflow.operators.generic_transfer.GenericTransfer(**kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from a connection to another, assuming that they both provide the required methods in their respective hooks. The source hook needs to expose a get_records method, and the destination a insert_rows method.

This is meant to be used on small-ish datasets that fit in memory.

Parameters:
  • sql (str) – SQL query to execute against the source database. (templated)
  • destination_table (str) – target table. (templated)
  • source_conn_id (str) – source connection
  • destination_conn_id (str) – source connection
  • preoperator (str or list[str]) – sql statement or list of statements to be executed prior to loading the data. (templated)
class airflow.operators.hive_to_samba_operator.Hive2SambaOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Executes hql code in a specific Hive database and loads the results of the query as a csv to a Samba location.

Parameters:
  • hql (str) – the hql to be exported. (templated)
  • destination_filepath (str) – the file path to where the file will be pushed onto samba
  • samba_conn_id (str) – reference to the samba destination
  • hiveserver2_conn_id (str) – reference to the hiveserver2 service
class airflow.operators.hive_operator.HiveOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Executes hql code or hive script in a specific Hive database.

Parameters:
  • hql (str) – the hql to be executed. Note that you may also use a relative path from the dag file of a (template) hive script. (templated)
  • hive_cli_conn_id (str) – reference to the Hive database. (templated)
  • hiveconfs (dict) – if defined, these key value pairs will be passed to hive as -hiveconf "key"="value"
  • hiveconf_jinja_translate (bool) – when True, hiveconf-type templating ${var} gets translated into jinja-type templating {{ var }} and ${hiveconf:var} gets translated into jinja-type templating {{ var }}. Note that you may want to use this along with the DAG(user_defined_macros=myargs) parameter. View the DAG object documentation for more details.
  • script_begin_tag (str) – If defined, the operator will get rid of the part of the script before the first occurrence of script_begin_tag
  • mapred_queue (str) – queue used by the Hadoop CapacityScheduler. (templated)
  • mapred_queue_priority (str) – priority within CapacityScheduler queue. Possible settings include: VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW
  • mapred_job_name (str) – This name will appear in the jobtracker. This can make monitoring easier.
class airflow.operators.hive_stats_operator.HiveStatsCollectionOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Gathers partition statistics using a dynamically generated Presto query, inserts the stats into a MySql table with this format. Stats overwrite themselves if you rerun the same date/partition.

CREATE TABLE hive_stats (
    ds VARCHAR(16),
    table_name VARCHAR(500),
    metric VARCHAR(200),
    value BIGINT
);
Parameters:
  • table (str) – the source table, in the format database.table_name. (templated)
  • partition (dict of {col:value}) – the source partition. (templated)
  • extra_exprs (dict) – dict of expression to run against the table where keys are metric names and values are Presto compatible expressions
  • col_blacklist (list) – list of columns to blacklist, consider blacklisting blobs, large json columns, …
  • assignment_func (function) – a function that receives a column name and a type, and returns a dict of metric names and an Presto expressions. If None is returned, the global defaults are applied. If an empty dictionary is returned, no stats are computed for that column.
class airflow.operators.hive_to_druid.HiveToDruidTransfer(**kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from Hive to Druid, [del]note that for now the data is loaded into memory before being pushed to Druid, so this operator should be used for smallish amount of data.[/del]

Parameters:
  • sql (str) – SQL query to execute against the Druid database. (templated)
  • druid_datasource (str) – the datasource you want to ingest into in druid
  • ts_dim (str) – the timestamp dimension
  • metric_spec (list) – the metrics you want to define for your data
  • hive_cli_conn_id (str) – the hive connection id
  • druid_ingest_conn_id (str) – the druid ingest connection id
  • metastore_conn_id (str) – the metastore connection id
  • hadoop_dependency_coordinates (list[str]) – list of coordinates to squeeze int the ingest json
  • intervals (list) – list of time intervals that defines segments, this is passed as is to the json object. (templated)
  • hive_tblproperties (dict) – additional properties for tblproperties in hive for the staging table
  • job_properties (dict) – additional properties for job
construct_ingest_query(static_path, columns)[source]

Builds an ingest query for an HDFS TSV load.

Parameters:
  • static_path (str) – The path on hdfs where the data is
  • columns (list) – List of all the columns that are available
class airflow.operators.hive_to_mysql.HiveToMySqlTransfer(**kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from Hive to MySQL, note that for now the data is loaded into memory before being pushed to MySQL, so this operator should be used for smallish amount of data.

Parameters:
  • sql (str) – SQL query to execute against Hive server. (templated)
  • mysql_table (str) – target MySQL table, use dot notation to target a specific database. (templated)
  • mysql_conn_id (str) – source mysql connection
  • hiveserver2_conn_id (str) – destination hive connection
  • mysql_preoperator (str) – sql statement to run against mysql prior to import, typically use to truncate of delete in place of the data coming in, allowing the task to be idempotent (running the task twice won’t double load data). (templated)
  • mysql_postoperator (str) – sql statement to run against mysql after the import, typically used to move data from staging to production and issue cleanup commands. (templated)
  • bulk_load (bool) – flag to use bulk_load option. This loads mysql directly from a tab-delimited text file using the LOAD DATA LOCAL INFILE command. This option requires an extra connection parameter for the destination MySQL connection: {‘local_infile’: true}.
class airflow.operators.check_operator.IntervalCheckOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before.

Note that this is an abstract class and get_db_hook needs to be defined. Whereas a get_db_hook is hook that gets a single record from an external source.

Parameters:
  • table (str) – the table name
  • days_back (int) – number of days between ds and the ds we want to check against. Defaults to 7 days
  • metrics_threshold (dict) – a dictionary of ratios indexed by metrics
class airflow.operators.jdbc_operator.JdbcOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Executes sql code in a database using jdbc driver.

Requires jaydebeapi.

Parameters:
  • sql (Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. Template reference are recognized by str ending in '.sql') – the sql code to be executed. (templated)
  • jdbc_conn_id (str) – reference to a predefined database
  • autocommit (bool) – if True, each command is automatically committed. (default value: False)
  • parameters (mapping or iterable) – (optional) the parameters to render the SQL query with.
class airflow.operators.latest_only_operator.LatestOnlyOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator, airflow.models.skipmixin.SkipMixin

Allows a workflow to skip tasks that are not running during the most recent schedule interval.

If the task is run outside of the latest schedule interval, all directly downstream tasks will be skipped.

class airflow.operators.mssql_operator.MsSqlOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Executes sql code in a specific Microsoft SQL database

Parameters:
  • sql (str or string pointing to a template file with .sql extension. (templated)) – the sql code to be executed
  • mssql_conn_id (str) – reference to a specific mssql database
  • parameters (mapping or iterable) – (optional) the parameters to render the SQL query with.
  • autocommit (bool) – if True, each command is automatically committed. (default value: False)
  • database (str) – name of database which overwrite defined one in connection
class airflow.operators.mssql_to_hive.MsSqlToHiveTransfer(**kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from Microsoft SQL Server to Hive. The operator runs your query against Microsoft SQL Server, stores the file locally before loading it into a Hive table. If the create or recreate arguments are set to True, a CREATE TABLE and DROP TABLE statements are generated. Hive data types are inferred from the cursor’s metadata. Note that the table generated in Hive uses STORED AS textfile which isn’t the most efficient serialization format. If a large amount of data is loaded and/or if the table gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using a HiveOperator.

Parameters:
  • sql (str) – SQL query to execute against the Microsoft SQL Server database. (templated)
  • hive_table (str) – target Hive table, use dot notation to target a specific database. (templated)
  • create (bool) – whether to create the table if it doesn’t exist
  • recreate (bool) – whether to drop and recreate the table at every execution
  • partition (dict) – target partition as a dict of partition columns and values. (templated)
  • delimiter (str) – field delimiter in the file
  • mssql_conn_id (str) – source Microsoft SQL Server connection
  • hive_conn_id (str) – destination hive connection
  • tblproperties (dict) – TBLPROPERTIES of the hive table being created
class airflow.operators.mysql_operator.MySqlOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Executes sql code in a specific MySQL database

Parameters:
  • sql (str or list[str]) – the sql code to be executed. Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. Template reference are recognized by str ending in ‘.sql’ (templated)
  • mysql_conn_id (str) – reference to a specific mysql database
  • parameters (mapping or iterable) – (optional) the parameters to render the SQL query with.
  • autocommit (bool) – if True, each command is automatically committed. (default value: False)
  • database (str) – name of database which overwrite defined one in connection
class airflow.operators.mysql_to_hive.MySqlToHiveTransfer(**kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from MySql to Hive. The operator runs your query against MySQL, stores the file locally before loading it into a Hive table. If the create or recreate arguments are set to True, a CREATE TABLE and DROP TABLE statements are generated. Hive data types are inferred from the cursor’s metadata. Note that the table generated in Hive uses STORED AS textfile which isn’t the most efficient serialization format. If a large amount of data is loaded and/or if the table gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using a HiveOperator.

Parameters:
  • sql (str) – SQL query to execute against the MySQL database. (templated)
  • hive_table (str) – target Hive table, use dot notation to target a specific database. (templated)
  • create (bool) – whether to create the table if it doesn’t exist
  • recreate (bool) – whether to drop and recreate the table at every execution
  • partition (dict) – target partition as a dict of partition columns and values. (templated)
  • delimiter (str) – field delimiter in the file
  • mysql_conn_id (str) – source mysql connection
  • hive_conn_id (str) – destination hive connection
  • tblproperties (dict) – TBLPROPERTIES of the hive table being created
class airflow.operators.oracle_operator.OracleOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Executes sql code in a specific Oracle database

Parameters:
  • sql (str or list[str]) – the sql code to be executed. Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. Template reference are recognized by str ending in ‘.sql’ (templated)
  • oracle_conn_id (str) – reference to a specific Oracle database
  • parameters (mapping or iterable) – (optional) the parameters to render the SQL query with.
  • autocommit (bool) – if True, each command is automatically committed. (default value: False)
class airflow.operators.pig_operator.PigOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Executes pig script.

Parameters:
  • pig (str) – the pig latin script to be executed. (templated)
  • pig_cli_conn_id (str) – reference to the Hive database
  • pigparams_jinja_translate (bool) – when True, pig params-type templating ${var} gets translated into jinja-type templating {{ var }}. Note that you may want to use this along with the DAG(user_defined_macros=myargs) parameter. View the DAG object documentation for more details.
class airflow.operators.postgres_operator.PostgresOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Executes sql code in a specific Postgres database

Parameters:
  • sql (Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. Template reference are recognized by str ending in '.sql') – the sql code to be executed. (templated)
  • postgres_conn_id (str) – reference to a specific postgres database
  • autocommit (bool) – if True, each command is automatically committed. (default value: False)
  • parameters (mapping or iterable) – (optional) the parameters to render the SQL query with.
  • database (str) – name of database which overwrite defined one in connection
class airflow.operators.presto_check_operator.PrestoCheckOperator(**kwargs)[source]

Bases: airflow.operators.check_operator.CheckOperator

Performs checks against Presto. The PrestoCheckOperator expects a sql query that will return a single row. Each value on that first row is evaluated using python bool casting. If any of the values return False the check is failed and errors out.

Note that Python bool casting evals the following as False:

  • False
  • 0
  • Empty string ("")
  • Empty list ([])
  • Empty dictionary or set ({})

Given a query like SELECT COUNT(*) FROM foo, it will fail only if the count == 0. You can craft much more complex query that could, for instance, check that the table has the same number of rows as the source table upstream, or that the count of today’s partition is greater than yesterday’s partition, or that a set of metrics are less than 3 standard deviation for the 7 day average.

This operator can be used as a data quality check in your pipeline, and depending on where you put it in your DAG, you have the choice to stop the critical path, preventing from publishing dubious data, or on the side and receive email alterts without stopping the progress of the DAG.

Parameters:
  • sql (str) – the sql to be executed
  • presto_conn_id (str) – reference to the Presto database
class airflow.operators.presto_check_operator.PrestoIntervalCheckOperator(**kwargs)[source]

Bases: airflow.operators.check_operator.IntervalCheckOperator

Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before.

Parameters:
  • table (str) – the table name
  • days_back (int) – number of days between ds and the ds we want to check against. Defaults to 7 days
  • metrics_threshold (dict) – a dictionary of ratios indexed by metrics
  • presto_conn_id (str) – reference to the Presto database
class airflow.operators.presto_to_mysql.PrestoToMySqlTransfer(**kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from Presto to MySQL, note that for now the data is loaded into memory before being pushed to MySQL, so this operator should be used for smallish amount of data.

Parameters:
  • sql (str) – SQL query to execute against Presto. (templated)
  • mysql_table (str) – target MySQL table, use dot notation to target a specific database. (templated)
  • mysql_conn_id (str) – source mysql connection
  • presto_conn_id (str) – source presto connection
  • mysql_preoperator (str) – sql statement to run against mysql prior to import, typically use to truncate of delete in place of the data coming in, allowing the task to be idempotent (running the task twice won’t double load data). (templated)
class airflow.operators.presto_check_operator.PrestoValueCheckOperator(**kwargs)[source]

Bases: airflow.operators.check_operator.ValueCheckOperator

Performs a simple value check using sql code.

Parameters:
  • sql (str) – the sql to be executed
  • presto_conn_id (str) – reference to the Presto database
class airflow.operators.python_operator.PythonOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Executes a Python callable

See also

For more information on how to use this operator, take a look at the guide: PythonOperator

Parameters:
  • python_callable (python callable) – A reference to an object that is callable
  • op_kwargs (dict) – a dictionary of keyword arguments that will get unpacked in your function
  • op_args (list) – a list of positional arguments that will get unpacked when calling your callable
  • provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates. For this to work, you need to define **kwargs in your function header.
  • templates_dict (dict[str]) – a dictionary where the values are templates that will get templated by the Airflow engine sometime between __init__ and execute takes place and are made available in your callable’s context after the template has been applied. (templated)
  • templates_exts (list[str]) – a list of file extensions to resolve while processing templated fields, for examples ['.sql', '.hql']
class airflow.operators.python_operator.PythonVirtualenvOperator(**kwargs)[source]

Bases: airflow.operators.python_operator.PythonOperator

Allows one to run a function in a virtualenv that is created and destroyed automatically (with certain caveats).

The function must be defined using def, and not be part of a class. All imports must happen inside the function and no variables outside of the scope may be referenced. A global scope variable named virtualenv_string_args will be available (populated by string_args). In addition, one can pass stuff through op_args and op_kwargs, and one can use a return value. Note that if your virtualenv runs in a different Python major version than Airflow, you cannot use return values, op_args, or op_kwargs. You can use string_args though.

Parameters:
  • python_callable (function) – A python function with no references to outside variables, defined with def, which will be run in a virtualenv
  • requirements (list[str]) – A list of requirements as specified in a pip install command
  • python_version (str) – The Python version to run the virtualenv with. Note that both 2 and 2.7 are acceptable forms.
  • use_dill (bool) – Whether to use dill to serialize the args and result (pickle is default). This allow more complex types but requires you to include dill in your requirements.
  • system_site_packages (bool) – Whether to include system_site_packages in your virtualenv. See virtualenv documentation for more information.
  • op_args – A list of positional arguments to pass to python_callable.
  • op_kwargs (dict) – A dict of keyword arguments to pass to python_callable.
  • string_args (list[str]) – Strings that are present in the global var virtualenv_string_args, available to python_callable at runtime as a list[str]. Note that args are split by newline.
  • templates_dict (dict of str) – a dictionary where the values are templates that will get templated by the Airflow engine sometime between __init__ and execute takes place and are made available in your callable’s context after the template has been applied
  • templates_exts (list[str]) – a list of file extensions to resolve while processing templated fields, for examples ['.sql', '.hql']
class airflow.operators.redshift_to_s3_operator.RedshiftToS3Transfer(**kwargs)[source]

Bases: airflow.models.BaseOperator

Executes an UNLOAD command to s3 as a CSV with headers

Parameters:
  • schema (str) – reference to a specific schema in redshift database
  • table (str) – reference to a specific table in redshift database
  • s3_bucket (str) – reference to a specific S3 bucket
  • s3_key (str) – reference to a specific S3 key
  • redshift_conn_id (str) – reference to a specific redshift database
  • aws_conn_id (str) – reference to a specific S3 connection
  • verify (bool or str) –

    Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values:

    • False: do not validate SSL certificates. SSL will still be used
      (unless use_ssl is False), but SSL certificates will not be verified.
    • path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
      You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.
  • unload_options (list) – reference to a list of UNLOAD options
class airflow.operators.s3_file_transform_operator.S3FileTransformOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Copies data from a source S3 location to a temporary location on the local filesystem. Runs a transformation on this file as specified by the transformation script and uploads the output to a destination S3 location.

The locations of the source and the destination files in the local filesystem is provided as an first and second arguments to the transformation script. The transformation script is expected to read the data from source, transform it and write the output to the local destination file. The operator then takes over control and uploads the local destination file to S3.

S3 Select is also available to filter the source contents. Users can omit the transformation script if S3 Select expression is specified.

Parameters:
  • source_s3_key (str) – The key to be retrieved from S3. (templated)
  • source_aws_conn_id (str) – source s3 connection
  • source_verify (bool or str) –

    Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values:

    • False: do not validate SSL certificates. SSL will still be used
      (unless use_ssl is False), but SSL certificates will not be verified.
    • path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
      You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.

    This is also applicable to dest_verify.

  • dest_s3_key (str) – The key to be written from S3. (templated)
  • dest_aws_conn_id (str) – destination s3 connection
  • replace (bool) – Replace dest S3 key if it already exists
  • transform_script (str) – location of the executable transformation script
  • select_expression (str) – S3 Select expression
class airflow.operators.s3_to_hive_operator.S3ToHiveTransfer(**kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from S3 to Hive. The operator downloads a file from S3, stores the file locally before loading it into a Hive table. If the create or recreate arguments are set to True, a CREATE TABLE and DROP TABLE statements are generated. Hive data types are inferred from the cursor’s metadata from.

Note that the table generated in Hive uses STORED AS textfile which isn’t the most efficient serialization format. If a large amount of data is loaded and/or if the tables gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using a HiveOperator.

Parameters:
  • s3_key (str) – The key to be retrieved from S3. (templated)
  • field_dict (dict) – A dictionary of the fields name in the file as keys and their Hive types as values
  • hive_table (str) – target Hive table, use dot notation to target a specific database. (templated)
  • create (bool) – whether to create the table if it doesn’t exist
  • recreate (bool) – whether to drop and recreate the table at every execution
  • partition (dict) – target partition as a dict of partition columns and values. (templated)
  • headers (bool) – whether the file contains column names on the first line
  • check_headers (bool) – whether the column names on the first line should be checked against the keys of field_dict
  • wildcard_match (bool) – whether the s3_key should be interpreted as a Unix wildcard pattern
  • delimiter (str) – field delimiter in the file
  • aws_conn_id (str) – source s3 connection
  • verify (bool or str) –

    Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values:

    • False: do not validate SSL certificates. SSL will still be used
      (unless use_ssl is False), but SSL certificates will not be verified.
    • path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
      You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.
  • hive_cli_conn_id (str) – destination hive connection
  • input_compressed (bool) – Boolean to determine if file decompression is required to process headers
  • tblproperties (dict) – TBLPROPERTIES of the hive table being created
  • select_expression (str) – S3 Select expression
class airflow.operators.s3_to_redshift_operator.S3ToRedshiftTransfer(**kwargs)[source]

Bases: airflow.models.BaseOperator

Executes an COPY command to load files from s3 to Redshift

Parameters:
  • schema (str) – reference to a specific schema in redshift database
  • table (str) – reference to a specific table in redshift database
  • s3_bucket (str) – reference to a specific S3 bucket
  • s3_key (str) – reference to a specific S3 key
  • redshift_conn_id (str) – reference to a specific redshift database
  • aws_conn_id (str) – reference to a specific S3 connection
  • verify (bool or str) –

    Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values:

    • False: do not validate SSL certificates. SSL will still be used
      (unless use_ssl is False), but SSL certificates will not be verified.
    • path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
      You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.
  • copy_options (list) – reference to a list of COPY options
class airflow.operators.python_operator.ShortCircuitOperator(**kwargs)[source]

Bases: airflow.operators.python_operator.PythonOperator, airflow.models.skipmixin.SkipMixin

Allows a workflow to continue only if a condition is met. Otherwise, the workflow “short-circuits” and downstream tasks are skipped.

The ShortCircuitOperator is derived from the PythonOperator. It evaluates a condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with a state of “skipped”. If the condition is True, downstream tasks proceed as normal.

The condition is determined by the result of python_callable.

class airflow.operators.http_operator.SimpleHttpOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Calls an endpoint on an HTTP system to execute an action

Parameters:
  • http_conn_id (str) – The connection to run the operator against
  • endpoint (str) – The relative part of the full url. (templated)
  • method (str) – The HTTP method to use, default = “POST”
  • data (For POST/PUT, depends on the content-type parameter, for GET a dictionary of key/value string pairs) – The data to pass. POST-data in POST/PUT and params in the URL for a GET request. (templated)
  • headers (a dictionary of string key/value pairs) – The HTTP headers to be added to the GET request
  • response_check (A lambda or defined function.) – A check against the ‘requests’ response object. Returns True for ‘pass’ and False otherwise.
  • extra_options (A dictionary of options, where key is string and value depends on the option that's being modified.) – Extra options for the ‘requests’ library, see the ‘requests’ documentation (options to modify timeout, ssl, etc.)
  • log_response (bool) – Log the response (default: False)
class airflow.operators.slack_operator.SlackAPIOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Base Slack Operator The SlackAPIPostOperator is derived from this operator. In the future additional Slack API Operators will be derived from this class as well

Parameters:
construct_api_call_params()[source]

Used by the execute function. Allows templating on the source fields of the api_call_params dict before construction

Override in child classes. Each SlackAPIOperator child class is responsible for having a construct_api_call_params function which sets self.api_call_params with a dict of API call parameters (https://api.slack.com/methods)

execute(**kwargs)[source]

SlackAPIOperator calls will not fail even if the call is not unsuccessful. It should not prevent a DAG from completing in success

class airflow.operators.slack_operator.SlackAPIPostOperator(**kwargs)[source]

Bases: airflow.operators.slack_operator.SlackAPIOperator

Posts messages to a slack channel

Parameters:
  • channel (str) – channel in which to post message on slack name (#general) or ID (C12318391). (templated)
  • username (str) – Username that airflow will be posting to Slack as. (templated)
  • text (str) – message to send to slack. (templated)
  • icon_url (str) – url to icon used for this message
  • attachments (array of hashes) – extra formatting details. (templated) - see https://api.slack.com/docs/attachments.
construct_api_call_params()[source]

Used by the execute function. Allows templating on the source fields of the api_call_params dict before construction

Override in child classes. Each SlackAPIOperator child class is responsible for having a construct_api_call_params function which sets self.api_call_params with a dict of API call parameters (https://api.slack.com/methods)

class airflow.operators.sqlite_operator.SqliteOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Executes sql code in a specific Sqlite database

Parameters:
  • sql (str or string pointing to a template file. File must have a '.sql' extensions.) – the sql code to be executed. (templated)
  • sqlite_conn_id (str) – reference to a specific sqlite database
  • parameters (mapping or iterable) – (optional) the parameters to render the SQL query with.
class airflow.operators.subdag_operator.SubDagOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

This runs a sub dag. By convention, a sub dag’s dag_id should be prefixed by its parent and a dot. As in parent.child.

Parameters:
  • subdag (airflow.models.DAG) – the DAG object to run as a subdag of the current DAG.
  • dag (airflow.models.DAG) – the parent DAG for the subdag.
  • executor (airflow.executors.base_executor.BaseExecutor) – the executor for this subdag. Default to use SequentialExecutor. Please find AIRFLOW-74 for more details.
class airflow.operators.dagrun_operator.TriggerDagRunOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Triggers a DAG run for a specified dag_id

Parameters:
  • trigger_dag_id (str) – the dag_id to trigger (templated)
  • python_callable (python callable) – a reference to a python function that will be called while passing it the context object and a placeholder object obj for your callable to fill and return if you want a DagRun created. This obj object contains a run_id and payload attribute that you can modify in your function. The run_id should be a unique identifier for that DAG run, and the payload has to be a picklable object that will be made available to your tasks while executing that DAG run. Your function header should look like def foo(context, dag_run_obj):
  • execution_date (str or datetime.datetime) – Execution date for the dag (templated)
class airflow.operators.check_operator.ValueCheckOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Performs a simple value check using sql code.

Note that this is an abstract class and get_db_hook needs to be defined. Whereas a get_db_hook is hook that gets a single record from an external source.

Parameters:sql (str) – the sql to be executed. (templated)

Sensors

class airflow.sensors.external_task_sensor.ExternalTaskSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for a different DAG or a task in a different DAG to complete for a specific execution_date

Parameters:
  • external_dag_id (str) – The dag_id that contains the task you want to wait for
  • external_task_id (str) – The task_id that contains the task you want to wait for. If None the sensor waits for the DAG
  • allowed_states (list) – list of allowed states, default is ['success']
  • execution_delta (datetime.timedelta) – time difference with the previous execution to look at, the default is the same execution_date as the current task or DAG. For yesterday, use [positive!] datetime.timedelta(days=1). Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.
  • execution_date_fn (callable) – function that receives the current execution date and returns the desired execution dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.
  • check_existence (bool) – Set to True to check if the external task exists (when external_task_id is not None) or check if the DAG to wait for exists (when external_task_id is None), and immediately cease waiting if the external task or DAG does not exist (default value: False).
poke(**kwargs)[source]

Function that the sensors defined while deriving this class should override.

class airflow.sensors.hive_partition_sensor.HivePartitionSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for a partition to show up in Hive.

Note: Because partition supports general logical operators, it can be inefficient. Consider using NamedHivePartitionSensor instead if you don’t need the full flexibility of HivePartitionSensor.

Parameters:
  • table (str) – The name of the table to wait for, supports the dot notation (my_database.my_table)
  • partition (str) – The partition clause to wait for. This is passed as is to the metastore Thrift client get_partitions_by_filter method, and apparently supports SQL like notation as in ds='2015-01-01' AND type='value' and comparison operators as in "ds>=2015-01-01"
  • metastore_conn_id (str) – reference to the metastore thrift service connection id
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.sensors.http_sensor.HttpSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Executes a HTTP GET statement and returns False on failure caused by 404 Not Found or response_check returning False.

HTTP Error codes other than 404 (like 403) or Connection Refused Error would fail the sensor itself directly (no more poking).

Parameters:
  • http_conn_id (str) – The connection to run the sensor against
  • method (str) – The HTTP request method to use
  • endpoint (str) – The relative part of the full url
  • request_params (a dictionary of string key/value pairs) – The parameters to be added to the GET url
  • headers (a dictionary of string key/value pairs) – The HTTP headers to be added to the GET request
  • response_check (A lambda or defined function.) – A check against the ‘requests’ response object. Returns True for ‘pass’ and False otherwise.
  • extra_options (A dictionary of options, where key is string and value depends on the option that's being modified.) – Extra options for the ‘requests’ library, see the ‘requests’ documentation (options to modify timeout, ssl, etc.)
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.sensors.metastore_partition_sensor.MetastorePartitionSensor(**kwargs)[source]

Bases: airflow.sensors.sql_sensor.SqlSensor

An alternative to the HivePartitionSensor that talk directly to the MySQL db. This was created as a result of observing sub optimal queries generated by the Metastore thrift service when hitting subpartitioned tables. The Thrift service’s queries were written in a way that wouldn’t leverage the indexes.

Parameters:
  • schema (str) – the schema
  • table (str) – the table
  • partition_name (str) – the partition name, as defined in the PARTITIONS table of the Metastore. Order of the fields does matter. Examples: ds=2016-01-01 or ds=2016-01-01/sub=foo for a sub partitioned table
  • mysql_conn_id (str) – a reference to the MySQL conn_id for the metastore
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.sensors.named_hive_partition_sensor.NamedHivePartitionSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for a set of partitions to show up in Hive.

Parameters:
  • partition_names (list[str]) – List of fully qualified names of the partitions to wait for. A fully qualified name is of the form schema.table/pk1=pv1/pk2=pv2, for example, default.users/ds=2016-01-01. This is passed as is to the metastore Thrift client get_partitions_by_name method. Note that you cannot use logical or comparison operators as in HivePartitionSensor.
  • metastore_conn_id (str) – reference to the metastore thrift service connection id
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.sensors.s3_key_sensor.S3KeySensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for a key (a file-like instance on S3) to be present in a S3 bucket. S3 being a key/value it does not support folders. The path is just a key a resource.

Parameters:
  • bucket_key (str) – The key being waited on. Supports full s3:// style url or relative path from root level. When it’s specified as a full s3:// url, please leave bucket_name as None.
  • bucket_name (str) – Name of the S3 bucket. Only needed when bucket_key is not provided as a full s3:// url.
  • wildcard_match (bool) – whether the bucket_key should be interpreted as a Unix wildcard pattern
  • aws_conn_id (str) – a reference to the s3 connection
  • verify (bool or str) –

    Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values:

    • False: do not validate SSL certificates. SSL will still be used
      (unless use_ssl is False), but SSL certificates will not be verified.
    • path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
      You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.sensors.s3_prefix_sensor.S3PrefixSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for a prefix to exist. A prefix is the first part of a key, thus enabling checking of constructs similar to glob airfl* or SQL LIKE ‘airfl%’. There is the possibility to precise a delimiter to indicate the hierarchy or keys, meaning that the match will stop at that delimiter. Current code accepts sane delimiters, i.e. characters that are NOT special characters in the Python regex engine.

Parameters:
  • bucket_name (str) – Name of the S3 bucket
  • prefix (str) – The prefix being waited on. Relative path from bucket root level.
  • delimiter (str) – The delimiter intended to show hierarchy. Defaults to ‘/’.
  • aws_conn_id (str) – a reference to the s3 connection
  • verify (bool or str) –

    Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values:

    • False: do not validate SSL certificates. SSL will still be used
      (unless use_ssl is False), but SSL certificates will not be verified.
    • path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
      You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.sensors.sql_sensor.SqlSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Runs a sql statement until a criteria is met. It will keep trying while sql returns no row, or if the first cell in (0, ‘0’, ‘’).

Parameters:
  • conn_id (str) – The connection to run the sensor against
  • sql (str) – The sql to run. To pass, it needs to return at least one cell that contains a non-zero / empty string value.
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.sensors.time_delta_sensor.TimeDeltaSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for a timedelta after the task’s execution_date + schedule_interval. In Airflow, the daily task stamped with execution_date 2016-01-01 can only start running on 2016-01-02. The timedelta here represents the time after the execution period has closed.

Parameters:delta (datetime.timedelta) – time length to wait after execution_date before succeeding
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.sensors.time_sensor.TimeSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits until the specified time of the day.

Parameters:target_time (datetime.time) – time after which the job succeeds
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.sensors.web_hdfs_sensor.WebHdfsSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for a file or folder to land in HDFS

poke(context)[source]

Function that the sensors defined while deriving this class should override.

Community-contributed Operators

Operators

class airflow.contrib.operators.aws_athena_operator.AWSAthenaOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

An operator that submit presto query to athena.

Parameters:
  • query (str) – Presto to be run on athena. (templated)
  • database (str) – Database to select. (templated)
  • output_location (str) – s3 path to write the query results into. (templated)
  • aws_conn_id (str) – aws connection to use
  • sleep_time (int) – Time to wait between two consecutive call to check query status on athena
execute(context)[source]

Run Presto Query on Athena

on_kill()[source]

Cancel the submitted athena query

class airflow.contrib.operators.awsbatch_operator.AWSBatchOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Execute a job on AWS Batch Service

Parameters:
class airflow.contrib.operators.adls_to_gcs.AdlsToGoogleCloudStorageOperator(**kwargs)[source]

Bases: airflow.contrib.operators.adls_list_operator.AzureDataLakeStorageListOperator

Synchronizes an Azure Data Lake Storage path with a GCS bucket

Parameters:
  • src_adls (str) – The Azure Data Lake path to find the objects (templated)
  • dest_gcs (str) – The Google Cloud Storage bucket and prefix to store the objects. (templated)
  • replace (bool) – If true, replaces same-named files in GCS
  • azure_data_lake_conn_id (str) – The connection ID to use when connecting to Azure Data Lake Storage.
  • google_cloud_storage_conn_id (str) – The connection ID to use when connecting to Google Cloud Storage.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
Examples:

The following Operator would copy a single file named hello/world.avro from ADLS to the GCS bucket mybucket. Its full resulting gcs path will be gs://mybucket/hello/world.avro

copy_single_file = AdlsToGoogleCloudStorageOperator(
    task_id='copy_single_file',
    src_adls='hello/world.avro',
    dest_gcs='gs://mybucket',
    replace=False,
    azure_data_lake_conn_id='azure_data_lake_default',
    google_cloud_storage_conn_id='google_cloud_default'
)

The following Operator would copy all parquet files from ADLS to the GCS bucket mybucket.

   copy_all_files = AdlsToGoogleCloudStorageOperator(
       task_id='copy_all_files',
       src_adls='*.parquet',
       dest_gcs='gs://mybucket',
       replace=False,
       azure_data_lake_conn_id='azure_data_lake_default',
       google_cloud_storage_conn_id='google_cloud_default'
   )

The following Operator would copy all parquet files from ADLS
path ``/hello/world``to the GCS bucket ``mybucket``. ::

   copy_world_files = AdlsToGoogleCloudStorageOperator(
       task_id='copy_world_files',
       src_adls='hello/world/*.parquet',
       dest_gcs='gs://mybucket',
       replace=False,
       azure_data_lake_conn_id='azure_data_lake_default',
       google_cloud_storage_conn_id='google_cloud_default'
   )
class airflow.contrib.operators.azure_container_instances_operator.AzureContainerInstancesOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Start a container on Azure Container Instances

Parameters:
  • ci_conn_id (str) – connection id of a service principal which will be used to start the container instance
  • registry_conn_id (str) – connection id of a user which can login to a private docker registry. If None, we assume a public registry
  • resource_group (str) – name of the resource group wherein this container instance should be started
  • name (str) – name of this container instance. Please note this name has to be unique in order to run containers in parallel.
  • image (str) – the docker image to be used
  • region (str) – the region wherein this container instance should be started
  • environment_variables (dict) – key,value pairs containing environment variables which will be passed to the running container
  • volumes (list[<conn_id, account_name, share_name, mount_path, read_only>]) – list of volumes to be mounted to the container. Currently only Azure Fileshares are supported.
  • memory_in_gb (double) – the amount of memory to allocate to this container
  • cpu (double) – the number of cpus to allocate to this container
  • command (str) – the command to run inside the container
Example:
>>>  a = AzureContainerInstancesOperator(
            'azure_service_principal',
            'azure_registry_user',
            'my-resource-group',
            'my-container-name-{{ ds }}',
            'myprivateregistry.azurecr.io/my_container:latest',
            'westeurope',
            {'EXECUTION_DATE': '{{ ds }}'},
            [('azure_wasb_conn_id',
              'my_storage_container',
              'my_fileshare',
              '/input-data',
              True),],
            memory_in_gb=14.0,
            cpu=4.0,
            command='python /app/myfile.py',
            task_id='start_container'
        )
class airflow.contrib.operators.azure_cosmos_operator.AzureCosmosInsertDocumentOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Inserts a new document into the specified Cosmos database and collection It will create both the database and collection if they do not already exist

Parameters:
  • database_name (str) – The name of the database. (templated)
  • collection_name (str) – The name of the collection. (templated)
  • document (dict) – The document to insert
  • azure_cosmos_conn_id (str) – reference to a CosmosDB connection.
class airflow.contrib.operators.adls_list_operator.AzureDataLakeStorageListOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

List all files from the specified path

This operator returns a python list with the names of files which can be used by
xcom in the downstream tasks.
Parameters:
  • path (str) – The Azure Data Lake path to find the objects. Supports glob strings (templated)
  • azure_data_lake_conn_id (str) – The connection ID to use when connecting to Azure Data Lake Storage.
Example:

The following Operator would list all the Parquet files from folder/output/ folder in the specified ADLS account

adls_files = AzureDataLakeStorageListOperator(
    task_id='adls_files',
    path='folder/output/*.parquet',
    azure_data_lake_conn_id='azure_data_lake_default'
)
class airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator(**kwargs)[source]

Bases: airflow.operators.check_operator.CheckOperator

Performs checks against BigQuery. The BigQueryCheckOperator expects a sql query that will return a single row. Each value on that first row is evaluated using python bool casting. If any of the values return False the check is failed and errors out.

Note that Python bool casting evals the following as False:

  • False
  • 0
  • Empty string ("")
  • Empty list ([])
  • Empty dictionary or set ({})

Given a query like SELECT COUNT(*) FROM foo, it will fail only if the count == 0. You can craft much more complex query that could, for instance, check that the table has the same number of rows as the source table upstream, or that the count of today’s partition is greater than yesterday’s partition, or that a set of metrics are less than 3 standard deviation for the 7 day average.

This operator can be used as a data quality check in your pipeline, and depending on where you put it in your DAG, you have the choice to stop the critical path, preventing from publishing dubious data, or on the side and receive email alterts without stopping the progress of the DAG.

Parameters:
  • sql (str) – the sql to be executed
  • bigquery_conn_id (str) – reference to the BigQuery database
  • use_legacy_sql (bool) – Whether to use legacy SQL (true) or standard SQL (false).
class airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyDatasetOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

This operator is used to create new dataset for your Project in Big query. https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

Parameters:
  • project_id (str) – The name of the project where we want to create the dataset. Don’t need to provide, if projectId in dataset_reference.
  • dataset_id (str) – The id of dataset. Don’t need to provide, if datasetId in dataset_reference.
  • dataset_reference – Dataset reference that could be provided with request body. More info: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
class airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Creates a new, empty table in the specified BigQuery dataset, optionally with schema.

The schema to be used for the BigQuery table may be specified in one of two ways. You may either directly pass the schema fields in, or you may point the operator to a Google cloud storage object name. The object in Google cloud storage must be a JSON file with the schema fields in it. You can also create a table without schema.

Parameters:
  • project_id (str) – The project to create the table into. (templated)
  • dataset_id (str) – The dataset to create the table into. (templated)
  • table_id (str) – The Name of the table to be created. (templated)
  • schema_fields (list) –

    If set, the schema field list as defined here: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema

    Example:

    schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
                   {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}]
    
  • gcs_schema_object (str) – Full path to the JSON file containing schema (templated). For example: gs://test-bucket/dir1/dir2/employee_schema.json
  • time_partitioning (dict) –

    configure optional time partitioning fields i.e. partition by field, type and expiration as per API specifications.

  • bigquery_conn_id (str) – Reference to a specific BigQuery hook.
  • google_cloud_storage_conn_id (str) – Reference to a specific Google cloud storage hook.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • labels (dict) –

    a dictionary containing labels for the table, passed to BigQuery

    Example (with schema JSON in GCS):

    CreateTable = BigQueryCreateEmptyTableOperator(
        task_id='BigQueryCreateEmptyTableOperator_task',
        dataset_id='ODS',
        table_id='Employees',
        project_id='internal-gcp-project',
        gcs_schema_object='gs://schema-bucket/employee_schema.json',
        bigquery_conn_id='airflow-service-account',
        google_cloud_storage_conn_id='airflow-service-account'
    )
    

    Corresponding Schema file (employee_schema.json):

    [
      {
        "mode": "NULLABLE",
        "name": "emp_name",
        "type": "STRING"
      },
      {
        "mode": "REQUIRED",
        "name": "salary",
        "type": "INTEGER"
      }
    ]
    

    Example (with schema in the DAG):

    CreateTable = BigQueryCreateEmptyTableOperator(
        task_id='BigQueryCreateEmptyTableOperator_task',
        dataset_id='ODS',
        table_id='Employees',
        project_id='internal-gcp-project',
        schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
                       {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}],
        bigquery_conn_id='airflow-service-account',
        google_cloud_storage_conn_id='airflow-service-account'
    )
    
class airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Creates a new external table in the dataset with the data in Google Cloud Storage.

The schema to be used for the BigQuery table may be specified in one of two ways. You may either directly pass the schema fields in, or you may point the operator to a Google cloud storage object name. The object in Google cloud storage must be a JSON file with the schema fields in it.

Parameters:
  • bucket (str) – The bucket to point the external table to. (templated)
  • source_objects (list) – List of Google cloud storage URIs to point table to. (templated) If source_format is ‘DATASTORE_BACKUP’, the list must only contain a single URI.
  • destination_project_dataset_table (str) – The dotted (<project>.)<dataset>.<table> BigQuery table to load data into (templated). If <project> is not included, project will be the project defined in the connection json.
  • schema_fields (list) –

    If set, the schema field list as defined here: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema

    Example:

    schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
                   {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}]
    

    Should not be set when source_format is ‘DATASTORE_BACKUP’.

  • schema_object (str) – If set, a GCS object path pointing to a .json file that contains the schema for the table. (templated)
  • source_format (str) – File format of the data.
  • compression (str) – [Optional] The compression type of the data source. Possible values include GZIP and NONE. The default value is NONE. This setting is ignored for Google Cloud Bigtable, Google Cloud Datastore backups and Avro formats.
  • skip_leading_rows (int) – Number of rows to skip when loading from a CSV.
  • field_delimiter (str) – The delimiter to use for the CSV.
  • max_bad_records (int) – The maximum number of bad records that BigQuery can ignore when running the job.
  • quote_character (str) – The value that is used to quote data sections in a CSV file.
  • allow_quoted_newlines (bool) – Whether to allow quoted newlines (true) or not (false).
  • allow_jagged_rows (bool) – Accept rows that are missing trailing optional columns. The missing values are treated as nulls. If false, records with missing trailing columns are treated as bad records, and if there are too many bad records, an invalid error is returned in the job result. Only applicable to CSV, ignored for other formats.
  • bigquery_conn_id (str) – Reference to a specific BigQuery hook.
  • google_cloud_storage_conn_id (str) – Reference to a specific Google cloud storage hook.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • src_fmt_configs (dict) – configure optional fields specific to the source format
  • labels (dict) – a dictionary containing labels for the table, passed to BigQuery
class airflow.contrib.operators.bigquery_operator.BigQueryDeleteDatasetOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

This operator deletes an existing dataset from your Project in Big query. https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/delete

Parameters:
  • project_id (str) – The project id of the dataset.
  • dataset_id (str) – The dataset to be deleted.

Example:

delete_temp_data = BigQueryDeleteDatasetOperator(dataset_id = 'temp-dataset',
                                                 project_id = 'temp-project',
                                                 bigquery_conn_id='_my_gcp_conn_',
                                                 task_id='Deletetemp',
                                                 dag=dag)
class airflow.contrib.operators.bigquery_get_data.BigQueryGetDataOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Fetches the data from a BigQuery table (alternatively fetch data for selected columns) and returns data in a python list. The number of elements in the returned list will be equal to the number of rows fetched. Each element in the list will again be a list where element would represent the columns values for that row.

Example Result: [['Tony', '10'], ['Mike', '20'], ['Steve', '15']]

Note

If you pass fields to selected_fields which are in different order than the order of columns already in BQ table, the data will still be in the order of BQ table. For example if the BQ table has 3 columns as [A,B,C] and you pass ‘B,A’ in the selected_fields the data would still be of the form 'A,B'.

Example:

get_data = BigQueryGetDataOperator(
    task_id='get_data_from_bq',
    dataset_id='test_dataset',
    table_id='Transaction_partitions',
    max_results='100',
    selected_fields='DATE',
    bigquery_conn_id='airflow-service-account'
)
Parameters:
  • dataset_id (str) – The dataset ID of the requested table. (templated)
  • table_id (str) – The table ID of the requested table. (templated)
  • max_results (str) – The maximum number of records (rows) to be fetched from the table. (templated)
  • selected_fields (str) – List of fields to return (comma-separated). If unspecified, all fields are returned.
  • bigquery_conn_id (str) – reference to a specific BigQuery hook.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
class airflow.contrib.operators.bigquery_check_operator.BigQueryIntervalCheckOperator(**kwargs)[source]

Bases: airflow.operators.check_operator.IntervalCheckOperator

Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before.

This method constructs a query like so

SELECT {metrics_threshold_dict_key} FROM {table}
WHERE {date_filter_column}=<date>
Parameters:
  • table (str) – the table name
  • days_back (int) – number of days between ds and the ds we want to check against. Defaults to 7 days
  • metrics_threshold (dict) – a dictionary of ratios indexed by metrics, for example ‘COUNT(*)’: 1.5 would require a 50 percent or less difference between the current day, and the prior days_back.
  • use_legacy_sql (bool) – Whether to use legacy SQL (true) or standard SQL (false).
class airflow.contrib.operators.bigquery_operator.BigQueryOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Executes BigQuery SQL queries in a specific BigQuery database

Parameters:
  • sql (Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. Template reference are recognized by str ending in '.sql'.) – the sql code to be executed (templated)
  • destination_dataset_table (str) – A dotted (<project>.|<project>:)<dataset>.<table> that, if set, will store the results of the query. (templated)
  • write_disposition (str) – Specifies the action that occurs if the destination table already exists. (default: ‘WRITE_EMPTY’)
  • create_disposition (str) – Specifies whether the job is allowed to create new tables. (default: ‘CREATE_IF_NEEDED’)
  • allow_large_results (bool) – Whether to allow large results.
  • flatten_results (bool) – If true and query uses legacy SQL dialect, flattens all nested and repeated fields in the query results. allow_large_results must be true if this is set to false. For standard SQL queries, this flag is ignored and results are never flattened.
  • bigquery_conn_id (str) – reference to a specific BigQuery hook.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • udf_config (list) – The User Defined Function configuration for the query. See https://cloud.google.com/bigquery/user-defined-functions for details.
  • use_legacy_sql (bool) – Whether to use legacy SQL (true) or standard SQL (false).
  • maximum_billing_tier (int) – Positive integer that serves as a multiplier of the basic price. Defaults to None, in which case it uses the value set in the project.
  • maximum_bytes_billed (float) – Limits the bytes billed for this job. Queries that will have bytes billed beyond this limit will fail (without incurring a charge). If unspecified, this will be set to your project default.
  • api_resource_configs (dict) – a dictionary that contain params ‘configuration’ applied for Google BigQuery Jobs API: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs for example, {‘query’: {‘useQueryCache’: False}}. You could use it if you need to provide some params that are not supported by BigQueryOperator like args.
  • schema_update_options (tuple) – Allows the schema of the destination table to be updated as a side effect of the load job.
  • query_params (dict) – a dictionary containing query parameter types and values, passed to BigQuery.
  • labels (dict) – a dictionary containing labels for the job/query, passed to BigQuery
  • priority (str) – Specifies a priority for the query. Possible values include INTERACTIVE and BATCH. The default value is INTERACTIVE.
  • time_partitioning (dict) – configure optional time partitioning fields i.e. partition by field, type and expiration as per API specifications.
  • cluster_fields (list[str]) – Request that the result of this query be stored sorted by one or more columns. This is only available in conjunction with time_partitioning. The order of columns given determines the sort order.
  • location (str) – The geographic location of the job. Required except for US and EU. See details at https://cloud.google.com/bigquery/docs/locations#specifying_your_location
class airflow.contrib.operators.bigquery_table_delete_operator.BigQueryTableDeleteOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Deletes BigQuery tables

Parameters:
  • deletion_dataset_table (str) – A dotted (<project>.|<project>:)<dataset>.<table> that indicates which table will be deleted. (templated)
  • bigquery_conn_id (str) – reference to a specific BigQuery hook.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • ignore_if_missing (bool) – if True, then return success even if the requested table does not exist.
class airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Copies data from one BigQuery table to another.

See also

For more details about these parameters: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy

Parameters:
  • source_project_dataset_tables (list|string) – One or more dotted (project:|project.)<dataset>.<table> BigQuery tables to use as the source data. If <project> is not included, project will be the project defined in the connection json. Use a list if there are multiple source tables. (templated)
  • destination_project_dataset_table (str) – The destination BigQuery table. Format is: (project:|project.)<dataset>.<table> (templated)
  • write_disposition (str) – The write disposition if the table already exists.
  • create_disposition (str) – The create disposition if the table doesn’t exist.
  • bigquery_conn_id (str) – reference to a specific BigQuery hook.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • labels (dict) – a dictionary containing labels for the job/query, passed to BigQuery
class airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Transfers a BigQuery table to a Google Cloud Storage bucket.

See also

For more details about these parameters: https://cloud.google.com/bigquery/docs/reference/v2/jobs

Parameters:
  • source_project_dataset_table (str) – The dotted (<project>.|<project>:)<dataset>.<table> BigQuery table to use as the source data. If <project> is not included, project will be the project defined in the connection json. (templated)
  • destination_cloud_storage_uris (list) – The destination Google Cloud Storage URI (e.g. gs://some-bucket/some-file.txt). (templated) Follows convention defined here: https://cloud.google.com/bigquery/exporting-data-from-bigquery#exportingmultiple
  • compression (str) – Type of compression to use.
  • export_format (str) – File format to export.
  • field_delimiter (str) – The delimiter to use when extracting to a CSV.
  • print_header (bool) – Whether to print a header for a CSV file extract.
  • bigquery_conn_id (str) – reference to a specific BigQuery hook.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • labels (dict) – a dictionary containing labels for the job/query, passed to BigQuery
class airflow.contrib.operators.bigquery_check_operator.BigQueryValueCheckOperator(**kwargs)[source]

Bases: airflow.operators.check_operator.ValueCheckOperator

Performs a simple value check using sql code.

Parameters:
  • sql (str) – the sql to be executed
  • use_legacy_sql (bool) – Whether to use legacy SQL (true) or standard SQL (false).
class airflow.contrib.operators.gcp_bigtable_operator.BigtableClusterUpdateOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator, airflow.contrib.operators.gcp_bigtable_operator.BigtableValidationMixin

Updates a Cloud Bigtable cluster.

For more details about updating a Cloud Bigtable cluster, have a look at the reference: https://googleapis.github.io/google-cloud-python/latest/bigtable/cluster.html#google.cloud.bigtable.cluster.Cluster.update

See also

For more information on how to use this operator, take a look at the guide: BigtableClusterUpdateOperator

Parameters:
  • instance_id (str) – The ID of the Cloud Bigtable instance.
  • cluster_id (str) – The ID of the Cloud Bigtable cluster to update.
  • nodes (int) – The desired number of nodes for the Cloud Bigtable cluster.
  • project_id (str) – Optional, the ID of the GCP project.
class airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceCreateOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator, airflow.contrib.operators.gcp_bigtable_operator.BigtableValidationMixin

Creates a new Cloud Bigtable instance. If the Cloud Bigtable instance with the given ID exists, the operator does not compare its configuration and immediately succeeds. No changes are made to the existing instance.

For more details about instance creation have a look at the reference: https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance.create

See also

For more information on how to use this operator, take a look at the guide: BigtableInstanceCreateOperator

Parameters:
  • instance_id (str) – The ID of the Cloud Bigtable instance to create.
  • main_cluster_id (str) – The ID for main cluster for the new instance.
  • main_cluster_zone (str) – The zone for main cluster See https://cloud.google.com/bigtable/docs/locations for more details.
  • project_id (str) – Optional, the ID of the GCP project. If set to None or missing, the default project_id from the GCP connection is used.
  • replica_cluster_id (str) – (optional) The ID for replica cluster for the new instance.
  • replica_cluster_zone (str) – (optional) The zone for replica cluster.
  • instance_type (enums.IntEnum) – (optional) The type of the instance.
  • instance_display_name (str) – (optional) Human-readable name of the instance. Defaults to instance_id.
  • instance_labels (dict) – (optional) Dictionary of labels to associate with the instance.
  • cluster_nodes (int) – (optional) Number of nodes for cluster.
  • cluster_storage_type (enums.IntEnum) – (optional) The type of storage.
  • timeout (int) – (optional) timeout (in seconds) for instance creation. If None is not specified, Operator will wait indefinitely.
class airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceDeleteOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator, airflow.contrib.operators.gcp_bigtable_operator.BigtableValidationMixin

Deletes the Cloud Bigtable instance, including its clusters and all related tables.

For more details about deleting instance have a look at the reference: https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance.delete

See also

For more information on how to use this operator, take a look at the guide: BigtableInstanceDeleteOperator

Parameters:
  • instance_id (str) – The ID of the Cloud Bigtable instance to delete.
  • project_id (str) – Optional, the ID of the GCP project. If set to None or missing, the default project_id from the GCP connection is used.
class airflow.contrib.operators.gcp_bigtable_operator.BigtableTableCreateOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator, airflow.contrib.operators.gcp_bigtable_operator.BigtableValidationMixin

Creates the table in the Cloud Bigtable instance.

For more details about creating table have a look at the reference: https://googleapis.github.io/google-cloud-python/latest/bigtable/table.html#google.cloud.bigtable.table.Table.create

See also

For more information on how to use this operator, take a look at the guide: BigtableTableCreateOperator

Parameters:
  • instance_id (str) – The ID of the Cloud Bigtable instance that will hold the new table.
  • table_id (str) – The ID of the table to be created.
  • project_id (str) – Optional, the ID of the GCP project. If set to None or missing, the default project_id from the GCP connection is used.
  • initial_split_keys (list) – (Optional) list of row keys in bytes that will be used to initially split the table into several tablets.
  • column_families (dict) – (Optional) A map columns to create. The key is the column_id str and the value is a google.cloud.bigtable.column_family.GarbageCollectionRule
class airflow.contrib.operators.gcp_bigtable_operator.BigtableTableDeleteOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator, airflow.contrib.operators.gcp_bigtable_operator.BigtableValidationMixin

Deletes the Cloud Bigtable table.

For more details about deleting table have a look at the reference: https://googleapis.github.io/google-cloud-python/latest/bigtable/table.html#google.cloud.bigtable.table.Table.delete

See also

For more information on how to use this operator, take a look at the guide: BigtableTableDeleteOperator

Parameters:
  • instance_id (str) – The ID of the Cloud Bigtable instance.
  • table_id (str) – The ID of the table to be deleted.
  • project_id (str) – Optional, the ID of the GCP project. If set to None or missing, the default project_id from the GCP connection is used.
Parm app_profile_id:
 

Application profile.

class airflow.contrib.operators.gcp_bigtable_operator.BigtableTableWaitForReplicationSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator, airflow.contrib.operators.gcp_bigtable_operator.BigtableValidationMixin

Sensor that waits for Cloud Bigtable table to be fully replicated to its clusters. No exception will be raised if the instance or the table does not exist.

For more details about cluster states for a table, have a look at the reference: https://googleapis.github.io/google-cloud-python/latest/bigtable/table.html#google.cloud.bigtable.table.Table.get_cluster_states

See also

For more information on how to use this operator, take a look at the guide: BigtableTableWaitForReplicationSensor

Parameters:
  • instance_id (str) – The ID of the Cloud Bigtable instance.
  • table_id (str) – The ID of the table to check replication status.
  • project_id (str) – Optional, the ID of the GCP project.
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.operators.cassandra_to_gcs.CassandraToGoogleCloudStorageOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Copy data from Cassandra to Google cloud storage in JSON format

Note: Arrays of arrays are not supported.

classmethod convert_map_type(name, value)[source]

Converts a map to a repeated RECORD that contains two fields: ‘key’ and ‘value’, each will be converted to its corresopnding data type in BQ.

classmethod convert_tuple_type(name, value)[source]

Converts a tuple to RECORD that contains n fields, each will be converted to its corresponding data type in bq and will be named ‘field_<index>’, where index is determined by the order of the tuple elments defined in cassandra.

classmethod convert_user_type(name, value)[source]

Converts a user type to RECORD that contains n fields, where n is the number of attributes. Each element in the user type class will be converted to its corresponding data type in BQ.

class airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDatabaseDeleteOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Deletes a Cloud Spanner database.

See also

For more information on how to use this operator, take a look at the guide: CloudSpannerInstanceDatabaseDeleteOperator

Parameters:
  • instance_id (str) – Cloud Spanner instance ID.
  • database_id (str) – Cloud Spanner database ID.
  • project_id (str) – Optional, the ID of the project that owns the Cloud Spanner Database. If set to None or missing, the default project_id from the GCP connection is used.
  • gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
class airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDatabaseDeployOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Creates a new Cloud Spanner database, or if database exists, the operator does nothing.

See also

For more information on how to use this operator, take a look at the guide: CloudSpannerInstanceDatabaseDeployOperator

Parameters:
  • instance_id (str) – The Cloud Spanner instance ID.
  • database_id (str) – The Cloud Spanner database ID.
  • ddl_statements (list[str]) – The string list containing DDL for the new database.
  • project_id (str) – Optional, the ID of the project that owns the Cloud Spanner Database. If set to None or missing, the default project_id from the GCP connection is used.
  • gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
class airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDatabaseQueryOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Executes an arbitrary DML query (INSERT, UPDATE, DELETE).

See also

For more information on how to use this operator, take a look at the guide: CloudSpannerInstanceDatabaseQueryOperator

Parameters:
  • instance_id (str) – The Cloud Spanner instance ID.
  • database_id (str) – The Cloud Spanner database ID.
  • query (str or list) – The query or list of queries to be executed. Can be a path to a SQL file.
  • project_id (str) – Optional, the ID of the project that owns the Cloud Spanner Database. If set to None or missing, the default project_id from the GCP connection is used.
  • gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
class airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDatabaseUpdateOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Updates a Cloud Spanner database with the specified DDL statement.

See also

For more information on how to use this operator, take a look at the guide: CloudSpannerInstanceDatabaseUpdateOperator

Parameters:
  • instance_id (str) – The Cloud Spanner instance ID.
  • database_id (str) – The Cloud Spanner database ID.
  • ddl_statements (list[str]) – The string list containing DDL to apply to the database.
  • project_id (str) – Optional, the ID of the project that owns the the Cloud Spanner Database. If set to None or missing, the default project_id from the GCP connection is used.
  • operation_id (str) – (Optional) Unique per database operation id that can be specified to implement idempotency check.
  • gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
class airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDeleteOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Deletes a Cloud Spanner instance. If an instance does not exist, no action is taken and the operator succeeds.

See also

For more information on how to use this operator, take a look at the guide: CloudSpannerInstanceDeleteOperator

Parameters:
  • instance_id (str) – The Cloud Spanner instance ID.
  • project_id (str) – Optional, the ID of the project that owns the Cloud Spanner Database. If set to None or missing, the default project_id from the GCP connection is used.
  • gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
class airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDeployOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Creates a new Cloud Spanner instance, or if an instance with the same instance_id exists in the specified project, updates the Cloud Spanner instance.

Parameters:
  • instance_id (str) – Cloud Spanner instance ID.
  • configuration_name (str) – The name of the Cloud Spanner instance configuration defining how the instance will be created. Required for instances that do not yet exist.
  • node_count (int) – (Optional) The number of nodes allocated to the Cloud Spanner instance.
  • display_name (str) – (Optional) The display name for the Cloud Spanner instance in the GCP Console. (Must be between 4 and 30 characters.) If this value is not set in the constructor, the name is the same as the instance ID.
  • project_id (str) – Optional, the ID of the project which owns the Cloud Spanner Database. If set to None or missing, the default project_id from the GCP connection is used.
  • gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
class airflow.contrib.operators.gcp_sql_operator.CloudSqlBaseOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Abstract base operator for Google Cloud SQL operators to inherit from.

Parameters:
  • instance (str) – Cloud SQL instance ID. This does not include the project ID.
  • project_id (str) – Optional, Google Cloud Platform Project ID. f set to None or missing, the default project_id from the GCP connection is used.
  • gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
  • api_version (str) – API version used (e.g. v1beta4).
class airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceCreateOperator(**kwargs)[source]

Bases: airflow.contrib.operators.gcp_sql_operator.CloudSqlBaseOperator

Creates a new Cloud SQL instance. If an instance with the same name exists, no action will be taken and the operator will succeed.

See also

For more information on how to use this operator, take a look at the guide: CloudSqlInstanceCreateOperator

Parameters:
  • body (dict) – Body required by the Cloud SQL insert API, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/insert #request-body
  • instance (str) – Cloud SQL instance ID. This does not include the project ID.
  • project_id (str) – Optional, Google Cloud Platform Project ID. If set to None or missing, the default project_id from the GCP connection is used.
  • gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
  • api_version (str) – API version used (e.g. v1beta4).
  • validate_body (bool) – True if body should be validated, False otherwise.
class airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabaseCreateOperator(**kwargs)[source]

Bases: airflow.contrib.operators.gcp_sql_operator.CloudSqlBaseOperator

Creates a new database inside a Cloud SQL instance.

See also

For more information on how to use this operator, take a look at the guide: CloudSqlInstanceDatabaseCreateOperator

Parameters:
  • instance (str) – Database instance ID. This does not include the project ID.
  • body (dict) – The request body, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body
  • project_id (str) – Optional, Google Cloud Platform Project ID. If set to None or missing, the default project_id from the GCP connection is used.
  • gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
  • api_version (str) – API version used (e.g. v1beta4).
  • validate_body (bool) – Whether the body should be validated. Defaults to True.
class airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabaseDeleteOperator(**kwargs)[source]

Bases: airflow.contrib.operators.gcp_sql_operator.CloudSqlBaseOperator

Deletes a database from a Cloud SQL instance.

See also

For more information on how to use this operator, take a look at the guide: CloudSqlInstanceDatabaseDeleteOperator

Parameters:
  • instance (str) – Database instance ID. This does not include the project ID.
  • database (str) – Name of the database to be deleted in the instance.
  • project_id (str) – Optional, Google Cloud Platform Project ID. If set to None or missing, the default project_id from the GCP connection is used.
  • gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
  • api_version (str) – API version used (e.g. v1beta4).
class airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabasePatchOperator(**kwargs)[source]

Bases: airflow.contrib.operators.gcp_sql_operator.CloudSqlBaseOperator

Updates a resource containing information about a database inside a Cloud SQL instance using patch semantics. See: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch

See also

For more information on how to use this operator, take a look at the guide: CloudSqlInstanceDatabasePatchOperator

Parameters:
  • instance (str) – Database instance ID. This does not include the project ID.
  • database (str) – Name of the database to be updated in the instance.
  • body (dict) – The request body, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/patch#request-body
  • project_id (str) – Optional, Google Cloud Platform Project ID.
  • gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
  • api_version (str) – API version used (e.g. v1beta4).
  • validate_body (bool) – Whether the body should be validated. Defaults to True.
class airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDeleteOperator(**kwargs)[source]

Bases: airflow.contrib.operators.gcp_sql_operator.CloudSqlBaseOperator

Deletes a Cloud SQL instance.

See also

For more information on how to use this operator, take a look at the guide: CloudSqlInstanceDeleteOperator

Parameters:
  • instance (str) – Cloud SQL instance ID. This does not include the project ID.
  • project_id (str) – Optional, Google Cloud Platform Project ID. If set to None or missing, the default project_id from the GCP connection is used.
  • gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
  • api_version (str) – API version used (e.g. v1beta4).
class airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceExportOperator(**kwargs)[source]

Bases: airflow.contrib.operators.gcp_sql_operator.CloudSqlBaseOperator

Exports data from a Cloud SQL instance to a Cloud Storage bucket as a SQL dump or CSV file.

Note: This operator is idempotent. If executed multiple times with the same export file URI, the export file in GCS will simply be overridden.

See also

For more information on how to use this operator, take a look at the guide: CloudSqlInstanceImportOperator

Parameters:
  • instance (str) – Cloud SQL instance ID. This does not include the project ID.
  • body (dict) – The request body, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/export#request-body
  • project_id (str) – Optional, Google Cloud Platform Project ID. If set to None or missing, the default project_id from the GCP connection is used.
  • gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
  • api_version (str) – API version used (e.g. v1beta4).
  • validate_body (bool) – Whether the body should be validated. Defaults to True.
class airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceImportOperator(**kwargs)[source]

Bases: airflow.contrib.operators.gcp_sql_operator.CloudSqlBaseOperator

Imports data into a Cloud SQL instance from a SQL dump or CSV file in Cloud Storage.

CSV IMPORT:

This operator is NOT idempotent for a CSV import. If the same file is imported multiple times, the imported data will be duplicated in the database. Moreover, if there are any unique constraints the duplicate import may result in an error.

SQL IMPORT:

This operator is idempotent for a SQL import if it was also exported by Cloud SQL. The exported SQL contains ‘DROP TABLE IF EXISTS’ statements for all tables to be imported.

If the import file was generated in a different way, idempotence is not guaranteed. It has to be ensured on the SQL file level.

See also

For more information on how to use this operator, take a look at the guide: CloudSqlInstanceImportOperator

Parameters:
  • instance (str) – Cloud SQL instance ID. This does not include the project ID.
  • body (dict) – The request body, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/export#request-body
  • project_id (str) – Optional, Google Cloud Platform Project ID. If set to None or missing, the default project_id from the GCP connection is used.
  • gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
  • api_version (str) – API version used (e.g. v1beta4).
  • validate_body (bool) – Whether the body should be validated. Defaults to True.
class airflow.contrib.operators.gcp_sql_operator.CloudSqlInstancePatchOperator(**kwargs)[source]

Bases: airflow.contrib.operators.gcp_sql_operator.CloudSqlBaseOperator

Updates settings of a Cloud SQL instance.

Caution: This is a partial update, so only included values for the settings will be updated.

In the request body, supply the relevant portions of an instance resource, according to the rules of patch semantics. https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch

See also

For more information on how to use this operator, take a look at the guide: CloudSqlInstancePatchOperator

Parameters:
  • body (dict) – Body required by the Cloud SQL patch API, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/patch#request-body
  • instance (str) – Cloud SQL instance ID. This does not include the project ID.
  • project_id (str) – Optional, Google Cloud Platform Project ID. If set to None or missing, the default project_id from the GCP connection is used.
  • gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
  • api_version (str) – API version used (e.g. v1beta4).
class airflow.contrib.operators.gcp_sql_operator.CloudSqlQueryOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Performs DML or DDL query on an existing Cloud Sql instance. It optionally uses cloud-sql-proxy to establish secure connection with the database.

See also

For more information on how to use this operator, take a look at the guide: CloudSqlQueryOperator

Parameters:
  • sql (str or list[str]) – SQL query or list of queries to run (should be DML or DDL query - this operator does not return any data from the database, so it is useless to pass it DQL queries. Note that it is responsibility of the author of the queries to make sure that the queries are idempotent. For example you can use CREATE TABLE IF NOT EXISTS to create a table.
  • parameters (mapping or iterable) – (optional) the parameters to render the SQL query with.
  • autocommit (bool) – if True, each command is automatically committed. (default value: False)
  • gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform for cloud-sql-proxy authentication.
  • gcp_cloudsql_conn_id (str) – The connection ID used to connect to Google Cloud SQL its schema should be gcpcloudsql://. See CloudSqlDatabaseHook for details on how to define gcpcloudsql:// connection.
class airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Start a Java Cloud DataFlow batch job. The parameters of the operation will be passed to the job.

Example:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date':
        (2016, 8, 1),
    'email': ['alex@vanboxel.be'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=30),
    'dataflow_default_options': {
        'project': 'my-gcp-project',
        'zone': 'us-central1-f',
        'stagingLocation': 'gs://bucket/tmp/dataflow/staging/',
    }
}

dag = DAG('test-dag', default_args=default_args)

task = DataFlowJavaOperator(
    gcp_conn_id='gcp_default',
    task_id='normalize-cal',
    jar='{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar',
    options={
        'autoscalingAlgorithm': 'BASIC',
        'maxNumWorkers': '50',
        'start': '{{ds}}',
        'partitionType': 'DAY'

    },
    dag=dag)

See also

For more detail on job submission have a look at the reference: https://cloud.google.com/dataflow/pipelines/specifying-exec-params

Parameters:
  • jar (str) – The reference to a self executing DataFlow jar (templated).
  • job_name (str) – The ‘jobName’ to use when executing the DataFlow job (templated). This ends up being set in the pipeline options, so any entry with key 'jobName' in options will be overwritten.
  • dataflow_default_options (dict) – Map of default job options.
  • options (dict) – Map of job specific options.
  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • poll_sleep (int) – The time in seconds to sleep between polling Google Cloud Platform for the dataflow job status while the job is in the JOB_STATE_RUNNING state.
  • job_class (str) – The name of the dataflow job class to be executued, it is often not the main class configured in the dataflow jar file.

jar, options, and job_name are templated so you can use variables in them.

Note that both dataflow_default_options and options will be merged to specify pipeline execution parameter, and dataflow_default_options is expected to save high-level options, for instances, project and zone information, which apply to all dataflow operators in the DAG.

It’s a good practice to define dataflow_* parameters in the default_args of the dag like the project, zone and staging location.

default_args = {
    'dataflow_default_options': {
        'project': 'my-gcp-project',
        'zone': 'europe-west1-d',
        'stagingLocation': 'gs://my-staging-bucket/staging/'
    }
}

You need to pass the path to your dataflow as a file reference with the jar parameter, the jar needs to be a self executing jar (see documentation here: https://beam.apache.org/documentation/runners/dataflow/#self-executing-jar). Use options to pass on options to your job.

t1 = DataFlowJavaOperator(
    task_id='datapflow_example',
    jar='{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar',
    options={
        'autoscalingAlgorithm': 'BASIC',
        'maxNumWorkers': '50',
        'start': '{{ds}}',
        'partitionType': 'DAY',
        'labels': {'foo' : 'bar'}
    },
    gcp_conn_id='gcp-airflow-service-account',
    dag=my-dag)
class airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Launching Cloud Dataflow jobs written in python. Note that both dataflow_default_options and options will be merged to specify pipeline execution parameter, and dataflow_default_options is expected to save high-level options, for instances, project and zone information, which apply to all dataflow operators in the DAG.

See also

For more detail on job submission have a look at the reference: https://cloud.google.com/dataflow/pipelines/specifying-exec-params

Parameters:
  • py_file (str) – Reference to the python dataflow pipeline file.py, e.g., /some/local/file/path/to/your/python/pipeline/file.
  • job_name (str) – The ‘job_name’ to use when executing the DataFlow job (templated). This ends up being set in the pipeline options, so any entry with key 'jobName' or 'job_name' in options will be overwritten.
  • py_options – Additional python options, e.g., [“-m”, “-v”].
  • dataflow_default_options (dict) – Map of default job options.
  • options (dict) – Map of job specific options.
  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • poll_sleep (int) – The time in seconds to sleep between polling Google Cloud Platform for the dataflow job status while the job is in the JOB_STATE_RUNNING state.
execute(context)[source]

Execute the python dataflow job.

class airflow.contrib.operators.dataproc_operator.DataProcHadoopOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Start a Hadoop Job on a Cloud DataProc cluster.

Parameters:
  • main_jar (str) – URI of the job jar provisioned on Cloud Storage. (use this or the main_class, not both together).
  • main_class (str) – Name of the job class. (use this or the main_jar, not both together).
  • arguments (list) – Arguments for the job. (templated)
  • archives (list) – List of archived files that will be unpacked in the work directory. Should be stored in Cloud Storage.
  • files (list) – List of files to be copied to the working directory
  • job_name (str) – The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. (templated)
  • cluster_name (str) – The name of the DataProc cluster. (templated)
  • dataproc_hadoop_properties (dict) – Map for the Pig properties. Ideal to put in default arguments
  • dataproc_hadoop_jars (list) – URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments.
  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • region (str) – The specified region where the dataproc cluster is created.
  • job_error_states (list) – Job states that should be considered error states. Any states in this list will result in an error being raised and failure of the task. Eg, if the CANCELLED state should also be considered a task failure, pass in ['ERROR', 'CANCELLED']. Possible values are currently only 'ERROR' and 'CANCELLED', but could change in the future. Defaults to ['ERROR'].
Variables:

dataproc_job_id (str) – The actual “jobId” as submitted to the Dataproc API. This is useful for identifying or linking to the job in the Google Cloud Console Dataproc UI, as the actual “jobId” submitted to the Dataproc API is appended with an 8 character random string.

class airflow.contrib.operators.dataproc_operator.DataProcHiveOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Start a Hive query Job on a Cloud DataProc cluster.

Parameters:
  • query (str) – The query or reference to the query file (q extension).
  • query_uri (str) – The uri of a hive script on Cloud Storage.
  • variables (dict) – Map of named parameters for the query.
  • job_name (str) – The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes.
  • cluster_name (str) – The name of the DataProc cluster.
  • dataproc_hive_properties (dict) – Map for the Pig properties. Ideal to put in default arguments
  • dataproc_hive_jars (list) – URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments.
  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • region (str) – The specified region where the dataproc cluster is created.
  • job_error_states (list) – Job states that should be considered error states. Any states in this list will result in an error being raised and failure of the task. Eg, if the CANCELLED state should also be considered a task failure, pass in ['ERROR', 'CANCELLED']. Possible values are currently only 'ERROR' and 'CANCELLED', but could change in the future. Defaults to ['ERROR'].
Variables:

dataproc_job_id (str) – The actual “jobId” as submitted to the Dataproc API. This is useful for identifying or linking to the job in the Google Cloud Console Dataproc UI, as the actual “jobId” submitted to the Dataproc API is appended with an 8 character random string.

class airflow.contrib.operators.dataproc_operator.DataProcPigOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Start a Pig query Job on a Cloud DataProc cluster. The parameters of the operation will be passed to the cluster.

It’s a good practice to define dataproc_* parameters in the default_args of the dag like the cluster name and UDFs.

default_args = {
    'cluster_name': 'cluster-1',
    'dataproc_pig_jars': [
        'gs://example/udf/jar/datafu/1.2.0/datafu.jar',
        'gs://example/udf/jar/gpig/1.2/gpig.jar'
    ]
}

You can pass a pig script as string or file reference. Use variables to pass on variables for the pig script to be resolved on the cluster or use the parameters to be resolved in the script as template parameters.

Example:

t1 = DataProcPigOperator(
        task_id='dataproc_pig',
        query='a_pig_script.pig',
        variables={'out': 'gs://example/output/{{ds}}'},
        dag=dag)

See also

For more detail on about job submission have a look at the reference: https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs

Parameters:
  • query (str) – The query or reference to the query file (pg or pig extension). (templated)
  • query_uri (str) – The uri of a pig script on Cloud Storage.
  • variables (dict) – Map of named parameters for the query. (templated)
  • job_name (str) – The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. (templated)
  • cluster_name (str) – The name of the DataProc cluster. (templated)
  • dataproc_pig_properties (dict) – Map for the Pig properties. Ideal to put in default arguments
  • dataproc_pig_jars (list) – URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments.
  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • region (str) – The specified region where the dataproc cluster is created.
  • job_error_states (list) – Job states that should be considered error states. Any states in this list will result in an error being raised and failure of the task. Eg, if the CANCELLED state should also be considered a task failure, pass in ['ERROR', 'CANCELLED']. Possible values are currently only 'ERROR' and 'CANCELLED', but could change in the future. Defaults to ['ERROR'].
Variables:

dataproc_job_id (str) – The actual “jobId” as submitted to the Dataproc API. This is useful for identifying or linking to the job in the Google Cloud Console Dataproc UI, as the actual “jobId” submitted to the Dataproc API is appended with an 8 character random string.

class airflow.contrib.operators.dataproc_operator.DataProcPySparkOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Start a PySpark Job on a Cloud DataProc cluster.

Parameters:
  • main (str) – [Required] The Hadoop Compatible Filesystem (HCFS) URI of the main Python file to use as the driver. Must be a .py file.
  • arguments (list) – Arguments for the job. (templated)
  • archives (list) – List of archived files that will be unpacked in the work directory. Should be stored in Cloud Storage.
  • files (list) – List of files to be copied to the working directory
  • pyfiles (list) – List of Python files to pass to the PySpark framework. Supported file types: .py, .egg, and .zip
  • job_name (str) – The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. (templated)
  • cluster_name (str) – The name of the DataProc cluster.
  • dataproc_pyspark_properties (dict) – Map for the Pig properties. Ideal to put in default arguments
  • dataproc_pyspark_jars (list) – URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments.
  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • region (str) – The specified region where the dataproc cluster is created.
  • job_error_states (list) – Job states that should be considered error states. Any states in this list will result in an error being raised and failure of the task. Eg, if the CANCELLED state should also be considered a task failure, pass in ['ERROR', 'CANCELLED']. Possible values are currently only 'ERROR' and 'CANCELLED', but could change in the future. Defaults to ['ERROR'].
Variables:

dataproc_job_id (str) – The actual “jobId” as submitted to the Dataproc API. This is useful for identifying or linking to the job in the Google Cloud Console Dataproc UI, as the actual “jobId” submitted to the Dataproc API is appended with an 8 character random string.

class airflow.contrib.operators.dataproc_operator.DataProcSparkOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Start a Spark Job on a Cloud DataProc cluster.

Parameters:
  • main_jar (str) – URI of the job jar provisioned on Cloud Storage. (use this or the main_class, not both together).
  • main_class (str) – Name of the job class. (use this or the main_jar, not both together).
  • arguments (list) – Arguments for the job. (templated)
  • archives (list) – List of archived files that will be unpacked in the work directory. Should be stored in Cloud Storage.
  • files (list) – List of files to be copied to the working directory
  • job_name (str) – The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. (templated)
  • cluster_name (str) – The name of the DataProc cluster. (templated)
  • dataproc_spark_properties (dict) – Map for the Pig properties. Ideal to put in default arguments
  • dataproc_spark_jars (list) – URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments.
  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • region (str) – The specified region where the dataproc cluster is created.
  • job_error_states (list) – Job states that should be considered error states. Any states in this list will result in an error being raised and failure of the task. Eg, if the CANCELLED state should also be considered a task failure, pass in ['ERROR', 'CANCELLED']. Possible values are currently only 'ERROR' and 'CANCELLED', but could change in the future. Defaults to ['ERROR'].
Variables:

dataproc_job_id (str) – The actual “jobId” as submitted to the Dataproc API. This is useful for identifying or linking to the job in the Google Cloud Console Dataproc UI, as the actual “jobId” submitted to the Dataproc API is appended with an 8 character random string.

class airflow.contrib.operators.dataproc_operator.DataProcSparkSqlOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Start a Spark SQL query Job on a Cloud DataProc cluster.

Parameters:
  • query (str) – The query or reference to the query file (q extension). (templated)
  • query_uri (str) – The uri of a spark sql script on Cloud Storage.
  • variables (dict) – Map of named parameters for the query. (templated)
  • job_name (str) – The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. (templated)
  • cluster_name (str) – The name of the DataProc cluster. (templated)
  • dataproc_spark_properties (dict) – Map for the Pig properties. Ideal to put in default arguments
  • dataproc_spark_jars (list) – URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments.
  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • region (str) – The specified region where the dataproc cluster is created.
  • job_error_states (list) – Job states that should be considered error states. Any states in this list will result in an error being raised and failure of the task. Eg, if the CANCELLED state should also be considered a task failure, pass in ['ERROR', 'CANCELLED']. Possible values are currently only 'ERROR' and 'CANCELLED', but could change in the future. Defaults to ['ERROR'].
Variables:

dataproc_job_id (str) – The actual “jobId” as submitted to the Dataproc API. This is useful for identifying or linking to the job in the Google Cloud Console Dataproc UI, as the actual “jobId” submitted to the Dataproc API is appended with an 8 character random string.

class airflow.contrib.operators.databricks_operator.DatabricksRunNowOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Runs an existing Spark job run to Databricks using the api/2.0/jobs/run-now API endpoint.

There are two ways to instantiate this operator.

In the first way, you can take the JSON payload that you typically use to call the api/2.0/jobs/run-now endpoint and pass it directly to our DatabricksRunNowOperator through the json parameter. For example

json = {
  "job_id": 42,
  "notebook_params": {
    "dry-run": "true",
    "oldest-time-to-consider": "1457570074236"
  }
}

notebook_run = DatabricksRunNowOperator(task_id='notebook_run', json=json)

Another way to accomplish the same thing is to use the named parameters of the DatabricksRunNowOperator directly. Note that there is exactly one named parameter for each top level parameter in the run-now endpoint. In this method, your code would look like this:

job_id=42

notebook_params = {
    "dry-run": "true",
    "oldest-time-to-consider": "1457570074236"
}

python_params = ["douglas adams", "42"]

spark_submit_params = ["--class", "org.apache.spark.examples.SparkPi"]

notebook_run = DatabricksRunNowOperator(
    job_id=job_id,
    notebook_params=notebook_params,
    python_params=python_params,
    spark_submit_params=spark_submit_params
)

In the case where both the json parameter AND the named parameters are provided, they will be merged together. If there are conflicts during the merge, the named parameters will take precedence and override the top level json keys.

Currently the named parameters that DatabricksRunNowOperator supports are
  • job_id
  • json
  • notebook_params
  • python_params
  • spark_submit_params
Parameters:
  • job_id (str) –

    the job_id of the existing Databricks job. This field will be templated.

  • json (dict) –

    A JSON object containing API parameters which will be passed directly to the api/2.0/jobs/run-now endpoint. The other named parameters (i.e. notebook_params, spark_submit_params..) to this operator will be merged with this json dictionary if they are provided. If there are conflicts during the merge, the named parameters will take precedence and override the top level json keys. (templated)

    See also

    For more information about templating see Jinja Templating. https://docs.databricks.com/api/latest/jobs.html#run-now

  • notebook_params (dict) –

    A dict from keys to values for jobs with notebook task, e.g. “notebook_params”: {“name”: “john doe”, “age”: “35”}. The map is passed to the notebook and will be accessible through the dbutils.widgets.get function. See Widgets for more information. If not specified upon run-now, the triggered run will use the job’s base parameters. notebook_params cannot be specified in conjunction with jar_params. The json representation of this field (i.e. {“notebook_params”:{“name”:”john doe”,”age”:”35”}}) cannot exceed 10,000 bytes. This field will be templated.

  • python_params (list[str]) –

    A list of parameters for jobs with python tasks, e.g. “python_params”: [“john doe”, “35”]. The parameters will be passed to python file as command line parameters. If specified upon run-now, it would overwrite the parameters specified in job setting. The json representation of this field (i.e. {“python_params”:[“john doe”,”35”]}) cannot exceed 10,000 bytes. This field will be templated.

  • spark_submit_params (list[str]) –

    A list of parameters for jobs with spark submit task, e.g. “spark_submit_params”: [“–class”, “org.apache.spark.examples.SparkPi”]. The parameters will be passed to spark-submit script as command line parameters. If specified upon run-now, it would overwrite the parameters specified in job setting. The json representation of this field cannot exceed 10,000 bytes. This field will be templated.

  • timeout_seconds (int32) – The timeout for this run. By default a value of 0 is used which means to have no timeout. This field will be templated.
  • databricks_conn_id (str) – The name of the Airflow connection to use. By default and in the common case this will be databricks_default. To use token based authentication, provide the key token in the extra field for the connection.
  • polling_period_seconds (int) – Controls the rate which we poll for the result of this run. By default the operator will poll every 30 seconds.
  • databricks_retry_limit (int) – Amount of times retry if the Databricks backend is unreachable. Its value must be greater than or equal to 1.
  • do_xcom_push (bool) – Whether we should push run_id and run_page_url to xcom.
class airflow.contrib.operators.databricks_operator.DatabricksSubmitRunOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Submits a Spark job run to Databricks using the api/2.0/jobs/runs/submit API endpoint.

There are two ways to instantiate this operator.

In the first way, you can take the JSON payload that you typically use to call the api/2.0/jobs/runs/submit endpoint and pass it directly to our DatabricksSubmitRunOperator through the json parameter. For example

json = {
  'new_cluster': {
    'spark_version': '2.1.0-db3-scala2.11',
    'num_workers': 2
  },
  'notebook_task': {
    'notebook_path': '/Users/airflow@example.com/PrepareData',
  },
}
notebook_run = DatabricksSubmitRunOperator(task_id='notebook_run', json=json)

Another way to accomplish the same thing is to use the named parameters of the DatabricksSubmitRunOperator directly. Note that there is exactly one named parameter for each top level parameter in the runs/submit endpoint. In this method, your code would look like this:

new_cluster = {
  'spark_version': '2.1.0-db3-scala2.11',
  'num_workers': 2
}
notebook_task = {
  'notebook_path': '/Users/airflow@example.com/PrepareData',
}
notebook_run = DatabricksSubmitRunOperator(
    task_id='notebook_run',
    new_cluster=new_cluster,
    notebook_task=notebook_task)

In the case where both the json parameter AND the named parameters are provided, they will be merged together. If there are conflicts during the merge, the named parameters will take precedence and override the top level json keys.

Currently the named parameters that DatabricksSubmitRunOperator supports are
  • spark_jar_task
  • notebook_task
  • new_cluster
  • existing_cluster_id
  • libraries
  • run_name
  • timeout_seconds
Parameters:
  • json (dict) –

    A JSON object containing API parameters which will be passed directly to the api/2.0/jobs/runs/submit endpoint. The other named parameters (i.e. spark_jar_task, notebook_task..) to this operator will be merged with this json dictionary if they are provided. If there are conflicts during the merge, the named parameters will take precedence and override the top level json keys. (templated)

    See also

    For more information about templating see Jinja Templating. https://docs.databricks.com/api/latest/jobs.html#runs-submit

  • spark_jar_task (dict) –

    The main class and parameters for the JAR task. Note that the actual JAR is specified in the libraries. EITHER spark_jar_task OR notebook_task should be specified. This field will be templated.

  • notebook_task (dict) –

    The notebook path and parameters for the notebook task. EITHER spark_jar_task OR notebook_task should be specified. This field will be templated.

  • new_cluster (dict) –

    Specs for a new cluster on which this task will be run. EITHER new_cluster OR existing_cluster_id should be specified. This field will be templated.

  • existing_cluster_id (str) – ID for existing cluster on which to run this task. EITHER new_cluster OR existing_cluster_id should be specified. This field will be templated.
  • libraries (list of dicts) –

    Libraries which this run will use. This field will be templated.

  • run_name (str) – The run name used for this task. By default this will be set to the Airflow task_id. This task_id is a required parameter of the superclass BaseOperator. This field will be templated.
  • timeout_seconds (int32) – The timeout for this run. By default a value of 0 is used which means to have no timeout. This field will be templated.
  • databricks_conn_id (str) – The name of the Airflow connection to use. By default and in the common case this will be databricks_default. To use token based authentication, provide the key token in the extra field for the connection.
  • polling_period_seconds (int) – Controls the rate which we poll for the result of this run. By default the operator will poll every 30 seconds.
  • databricks_retry_limit (int) – Amount of times retry if the Databricks backend is unreachable. Its value must be greater than or equal to 1.
  • databricks_retry_delay (float) – Number of seconds to wait between retries (it might be a floating point number).
  • do_xcom_push (bool) – Whether we should push run_id and run_page_url to xcom.
class airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Start a Templated Cloud DataFlow batch job. The parameters of the operation will be passed to the job.

Parameters:
  • template (str) – The reference to the DataFlow template.
  • job_name – The ‘jobName’ to use when executing the DataFlow template (templated).
  • dataflow_default_options (dict) – Map of default job environment options.
  • parameters (dict) – Map of job specific parameters for the template.
  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • poll_sleep (int) – The time in seconds to sleep between polling Google Cloud Platform for the dataflow job status while the job is in the JOB_STATE_RUNNING state.

It’s a good practice to define dataflow_* parameters in the default_args of the dag like the project, zone and staging location.

default_args = {
    'dataflow_default_options': {
        'project': 'my-gcp-project',
        'region': 'europe-west1',
        'zone': 'europe-west1-d',
        'tempLocation': 'gs://my-staging-bucket/staging/',
        }
    }
}

You need to pass the path to your dataflow template as a file reference with the template parameter. Use parameters to pass on parameters to your job. Use environment to pass on runtime environment variables to your job.

t1 = DataflowTemplateOperator(
    task_id='datapflow_example',
    template='{{var.value.gcp_dataflow_base}}',
    parameters={
        'inputFile': "gs://bucket/input/my_input.txt",
        'outputFile': "gs://bucket/output/my_output.txt"
    },
    gcp_conn_id='gcp-airflow-service-account',
    dag=my-dag)

template, dataflow_default_options, parameters, and job_name are templated so you can use variables in them.

Note that dataflow_default_options is expected to save high-level options for project information, which apply to all dataflow operators in the DAG.

class airflow.contrib.operators.dataproc_operator.DataprocClusterCreateOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Create a new cluster on Google Cloud Dataproc. The operator will wait until the creation is successful or an error occurs in the creation process.

The parameters allow to configure the cluster. Please refer to

https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters

for a detailed explanation on the different parameters. Most of the configuration parameters detailed in the link are available as a parameter to this operator.

Parameters:
  • cluster_name (str) – The name of the DataProc cluster to create. (templated)
  • project_id (str) – The ID of the google cloud project in which to create the cluster. (templated)
  • num_workers (int) – The # of workers to spin up. If set to zero will spin up cluster in a single node mode
  • storage_bucket (str) – The storage bucket to use, setting to None lets dataproc generate a custom one for you
  • init_actions_uris (list[str]) – List of GCS uri’s containing dataproc initialization scripts
  • init_action_timeout (str) – Amount of time executable scripts in init_actions_uris has to complete
  • metadata (dict) – dict of key-value google compute engine metadata entries to add to all instances
  • image_version (str) – the version of software inside the Dataproc cluster
  • custom_image (str) – custom Dataproc image for more info see https://cloud.google.com/dataproc/docs/guides/dataproc-images
  • properties (dict) – dict of properties to set on config files (e.g. spark-defaults.conf), see https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#SoftwareConfig
  • master_machine_type (str) – Compute engine machine type to use for the master node
  • master_disk_type (str) – Type of the boot disk for the master node (default is pd-standard). Valid values: pd-ssd (Persistent Disk Solid State Drive) or pd-standard (Persistent Disk Hard Disk Drive).
  • master_disk_size (int) – Disk size for the master node
  • worker_machine_type (str) – Compute engine machine type to use for the worker nodes
  • worker_disk_type (str) – Type of the boot disk for the worker node (default is pd-standard). Valid values: pd-ssd (Persistent Disk Solid State Drive) or pd-standard (Persistent Disk Hard Disk Drive).
  • worker_disk_size (int) – Disk size for the worker nodes
  • num_preemptible_workers (int) – The # of preemptible worker nodes to spin up
  • labels (dict) – dict of labels to add to the cluster
  • zone (str) – The zone where the cluster will be located. (templated)
  • network_uri (str) – The network uri to be used for machine communication, cannot be specified with subnetwork_uri
  • subnetwork_uri (str) – The subnetwork uri to be used for machine communication, cannot be specified with network_uri
  • internal_ip_only (bool) – If true, all instances in the cluster will only have internal IP addresses. This can only be enabled for subnetwork enabled networks
  • tags (list[str]) – The GCE tags to add to all instances
  • region (str) – leave as ‘global’, might become relevant in the future. (templated)
  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • service_account (str) – The service account of the dataproc instances.
  • service_account_scopes (list[str]) – The URIs of service account scopes to be included.
  • idle_delete_ttl (int) – The longest duration that cluster would keep alive while staying idle. Passing this threshold will cause cluster to be auto-deleted. A duration in seconds.
  • auto_delete_time (datetime.datetime) – The time when cluster will be auto-deleted.
  • auto_delete_ttl (int) – The life duration of cluster, the cluster will be auto-deleted at the end of this duration. A duration in seconds. (If auto_delete_time is set this parameter will be ignored)
  • customer_managed_key (str) – The customer-managed key used for disk encryption (projects/[PROJECT_STORING_KEYS]/locations/[LOCATION]/keyRings/[KEY_RING_NAME]/cryptoKeys/[KEY_NAME])
class airflow.contrib.operators.dataproc_operator.DataprocClusterDeleteOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Delete a cluster on Google Cloud Dataproc. The operator will wait until the cluster is destroyed.

Parameters:
  • cluster_name (str) – The name of the cluster to delete. (templated)
  • project_id (str) – The ID of the google cloud project in which the cluster runs. (templated)
  • region (str) – leave as ‘global’, might become relevant in the future. (templated)
  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
class airflow.contrib.operators.dataproc_operator.DataprocClusterScaleOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Scale, up or down, a cluster on Google Cloud Dataproc. The operator will wait until the cluster is re-scaled.

Example:

t1 = DataprocClusterScaleOperator(
        task_id='dataproc_scale',
        project_id='my-project',
        cluster_name='cluster-1',
        num_workers=10,
        num_preemptible_workers=10,
        graceful_decommission_timeout='1h',
        dag=dag)

See also

For more detail on about scaling clusters have a look at the reference: https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/scaling-clusters

Parameters:
  • cluster_name (str) – The name of the cluster to scale. (templated)
  • project_id (str) – The ID of the google cloud project in which the cluster runs. (templated)
  • region (str) – The region for the dataproc cluster. (templated)
  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
  • num_workers (int) – The new number of workers
  • num_preemptible_workers (int) – The new number of preemptible workers
  • graceful_decommission_timeout (str) – Timeout for graceful YARN decomissioning. Maximum value is 1d
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
class airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateBaseOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

class airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateInstantiateInlineOperator(**kwargs)[source]

Bases: airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateBaseOperator

Instantiate a WorkflowTemplate Inline on Google Cloud Dataproc. The operator will wait until the WorkflowTemplate is finished executing.

Parameters:
  • template (map) – The template contents. (templated)
  • project_id (str) – The ID of the google cloud project in which the template runs
  • region (str) – leave as ‘global’, might become relevant in the future
  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
class airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(**kwargs)[source]

Bases: airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateBaseOperator

Instantiate a WorkflowTemplate on Google Cloud Dataproc. The operator will wait until the WorkflowTemplate is finished executing.

Parameters:
  • template_id (str) – The id of the template. (templated)
  • project_id (str) – The ID of the google cloud project in which the template runs
  • region (str) – leave as ‘global’, might become relevant in the future
  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
class airflow.contrib.operators.datastore_export_operator.DatastoreExportOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Export entities from Google Cloud Datastore to Cloud Storage

Parameters:
  • bucket (str) – name of the cloud storage bucket to backup data
  • namespace (str) – optional namespace path in the specified Cloud Storage bucket to backup data. If this namespace does not exist in GCS, it will be created.
  • datastore_conn_id (str) – the name of the Datastore connection id to use
  • cloud_storage_conn_id (str) – the name of the cloud storage connection id to force-write backup
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • entity_filter (dict) – description of what data from the project is included in the export, refer to https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter
  • labels (dict) – client-assigned labels for cloud storage
  • polling_interval_in_seconds (int) – number of seconds to wait before polling for execution status again
  • overwrite_existing (bool) – if the storage bucket + namespace is not empty, it will be emptied prior to exports. This enables overwriting existing backups.
class airflow.contrib.operators.datastore_import_operator.DatastoreImportOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Import entities from Cloud Storage to Google Cloud Datastore

Parameters:
  • bucket (str) – container in Cloud Storage to store data
  • file (str) – path of the backup metadata file in the specified Cloud Storage bucket. It should have the extension .overall_export_metadata
  • namespace (str) – optional namespace of the backup metadata file in the specified Cloud Storage bucket.
  • entity_filter (dict) – description of what data from the project is included in the export, refer to https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter
  • labels (dict) – client-assigned labels for cloud storage
  • datastore_conn_id (str) – the name of the connection id to use
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • polling_interval_in_seconds (int) – number of seconds to wait before polling for execution status again
class airflow.contrib.operators.discord_webhook_operator.DiscordWebhookOperator(**kwargs)[source]

Bases: airflow.operators.http_operator.SimpleHttpOperator

This operator allows you to post messages to Discord using incoming webhooks. Takes a Discord connection ID with a default relative webhook endpoint. The default endpoint can be overridden using the webhook_endpoint parameter (https://discordapp.com/developers/docs/resources/webhook).

Each Discord webhook can be pre-configured to use a specific username and avatar_url. You can override these defaults in this operator.

Parameters:
  • http_conn_id (str) – Http connection ID with host as “https://discord.com/api/” and default webhook endpoint in the extra field in the form of {“webhook_endpoint”: “webhooks/{webhook.id}/{webhook.token}”}
  • webhook_endpoint (str) – Discord webhook endpoint in the form of “webhooks/{webhook.id}/{webhook.token}”
  • message (str) – The message you want to send to your Discord channel (max 2000 characters). (templated)
  • username (str) – Override the default username of the webhook. (templated)
  • avatar_url (str) – Override the default avatar of the webhook
  • tts (bool) – Is a text-to-speech message
  • proxy (str) – Proxy to use to make the Discord webhook call
execute(context)[source]

Call the DiscordWebhookHook to post message

class airflow.contrib.operators.druid_operator.DruidOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Allows to submit a task directly to druid

Parameters:
  • json_index_file (str) – The filepath to the druid index specification
  • druid_ingest_conn_id (str) – The connection id of the Druid overlord which accepts index jobs
class airflow.contrib.operators.ecs_operator.ECSOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Execute a task on AWS EC2 Container Service

Parameters:
  • task_definition (str) – the task definition name on EC2 Container Service
  • cluster (str) – the cluster name on EC2 Container Service
  • overrides (dict) – the same parameter that boto3 will receive (templated): http://boto3.readthedocs.org/en/latest/reference/services/ecs.html#ECS.Client.run_task
  • aws_conn_id (str) – connection id of AWS credentials / region name. If None, credential boto3 strategy will be used (http://boto3.readthedocs.io/en/latest/guide/configuration.html).
  • region_name (str) – region name to use in AWS Hook. Override the region_name in connection (if provided)
  • launch_type (str) – the launch type on which to run your task (‘EC2’ or ‘FARGATE’)
  • group (str) – the name of the task group associated with the task
  • placement_constraints (list) – an array of placement constraint objects to use for the task
  • platform_version (str) – the platform version on which your task is running
  • network_configuration (dict) – the network configuration for the task
class airflow.contrib.operators.emr_add_steps_operator.EmrAddStepsOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

An operator that adds steps to an existing EMR job_flow.

Parameters:
  • job_flow_id (str) – id of the JobFlow to add steps to. (templated)
  • aws_conn_id (str) – aws connection to uses
  • steps (list) – boto3 style steps to be added to the jobflow. (templated)
class airflow.contrib.operators.emr_create_job_flow_operator.EmrCreateJobFlowOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Creates an EMR JobFlow, reading the config from the EMR connection. A dictionary of JobFlow overrides can be passed that override the config from the connection.

Parameters:
  • aws_conn_id (str) – aws connection to uses
  • emr_conn_id (str) – emr connection to use
  • job_flow_overrides (dict) – boto3 style arguments to override emr_connection extra. (templated)
class airflow.contrib.operators.emr_terminate_job_flow_operator.EmrTerminateJobFlowOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Operator to terminate EMR JobFlows.

Parameters:
  • job_flow_id (str) – id of the JobFlow to terminate. (templated)
  • aws_conn_id (str) – aws connection to uses
class airflow.contrib.operators.file_to_gcs.FileToGoogleCloudStorageOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Uploads a file to Google Cloud Storage. Optionally can compress the file for upload.

Parameters:
  • src (str) – Path to the local file. (templated)
  • dst (str) – Destination path within the specified bucket. (templated)
  • bucket (str) – The bucket to upload to. (templated)
  • google_cloud_storage_conn_id (str) – The Airflow connection ID to upload with
  • mime_type (str) – The mime-type string
  • delegate_to (str) – The account to impersonate, if any
  • gzip (bool) – Allows for file to be compressed and uploaded as gzip
execute(context)[source]

Uploads the file to Google cloud storage

class airflow.contrib.operators.file_to_wasb.FileToWasbOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Uploads a file to Azure Blob Storage.

Parameters:
  • file_path (str) – Path to the file to load. (templated)
  • container_name (str) – Name of the container. (templated)
  • blob_name (str) – Name of the blob. (templated)
  • wasb_conn_id (str) – Reference to the wasb connection.
  • load_options (dict) – Optional keyword arguments that WasbHook.load_file() takes.
execute(context)[source]

Upload a file to Azure Blob Storage.

class airflow.contrib.operators.gcp_container_operator.GKEClusterCreateOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Create a Google Kubernetes Engine Cluster of specified dimensions The operator will wait until the cluster is created.

The minimum required to define a cluster to create is:

dict() ::
cluster_def = {‘name’: ‘my-cluster-name’,
‘initial_node_count’: 1}

or

Cluster proto ::

from google.cloud.container_v1.types import Cluster

cluster_def = Cluster(name=’my-cluster-name’, initial_node_count=1)

Operator Creation:

operator = GKEClusterCreateOperator(
            task_id='cluster_create',
            project_id='my-project',
            location='my-location'
            body=cluster_def)

See also

For more detail on about creating clusters have a look at the reference: google.cloud.container_v1.types.Cluster

Parameters:
  • project_id (str) – The Google Developers Console [project ID or project number]
  • location (str) – The name of the Google Compute Engine zone in which the cluster resides.
  • body (dict or google.cloud.container_v1.types.Cluster) – The Cluster definition to create, can be protobuf or python dict, if dict it must match protobuf message Cluster
  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
  • api_version (str) – The api version to use
class airflow.contrib.operators.gcp_container_operator.GKEClusterDeleteOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Deletes the cluster, including the Kubernetes endpoint and all worker nodes.

To delete a certain cluster, you must specify the project_id, the name of the cluster, the location that the cluster is in, and the task_id.

Operator Creation:

operator = GKEClusterDeleteOperator(
            task_id='cluster_delete',
            project_id='my-project',
            location='cluster-location'
            name='cluster-name')
Parameters:
  • project_id (str) – The Google Developers Console [project ID or project number]
  • name (str) – The name of the resource to delete, in this case cluster name
  • location (str) – The name of the Google Compute Engine zone in which the cluster resides.
  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
  • api_version (str) – The api version to use
class airflow.contrib.operators.gcp_container_operator.GKEPodOperator(**kwargs)[source]

Bases: airflow.contrib.operators.kubernetes_pod_operator.KubernetesPodOperator

Executes a task in a Kubernetes pod in the specified Google Kubernetes Engine cluster

This Operator assumes that the system has gcloud installed and either has working default application credentials or has configured a connection id with a service account.

The minimum required to define a cluster to create are the variables task_id, project_id, location, cluster_name, name, namespace, and image

Operator Creation:

operator = GKEPodOperator(task_id='pod_op',
                          project_id='my-project',
                          location='us-central1-a',
                          cluster_name='my-cluster-name',
                          name='task-name',
                          namespace='default',
                          image='perl')

See also

For more detail about application authentication have a look at the reference: https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application

Parameters:
  • project_id (str) – The Google Developers Console project id
  • location (str) – The name of the Google Kubernetes Engine zone in which the cluster resides, e.g. ‘us-central1-a’
  • cluster_name (str) – The name of the Google Kubernetes Engine cluster the pod should be spawned in
  • gcp_conn_id (str) – The google cloud connection id to use. This allows for users to specify a service account.
class airflow.contrib.operators.gcp_compute_operator.GceBaseOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Abstract base operator for Google Compute Engine operators to inherit from.

class airflow.contrib.operators.gcp_compute_operator.GceInstanceGroupManagerUpdateTemplateOperator(**kwargs)[source]

Bases: airflow.contrib.operators.gcp_compute_operator.GceBaseOperator

Patches the Instance Group Manager, replacing source template URL with the destination one. API V1 does not have update/patch operations for Instance Group Manager, so you must use beta or newer API version. Beta is the default.

See also

For more information on how to use this operator, take a look at the guide: GceInstanceGroupManagerUpdateTemplateOperator

Parameters:
  • resource_id (str) – Name of the Instance Group Manager
  • zone (str) – Google Cloud Platform zone where the Instance Group Manager exists.
  • source_template (str) – URL of the template to replace.
  • destination_template (str) – URL of the target template.
  • project_id (str) – Optional, Google Cloud Platform Project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used.
  • request_id (str) – Optional, unique request_id that you might add to achieve full idempotence (for example when client call times out repeating the request with the same request id will not create a new instance template again). It should be in UUID format as defined in RFC 4122.
  • gcp_conn_id (str) – Optional, The connection ID used to connect to Google Cloud Platform. Defaults to ‘google_cloud_default’.
  • api_version (str) – Optional, API version used (for example v1 - or beta). Defaults to v1.
  • validate_body (bool) – Optional, If set to False, body validation is not performed. Defaults to False.
class airflow.contrib.operators.gcp_compute_operator.GceInstanceStartOperator(**kwargs)[source]

Bases: airflow.contrib.operators.gcp_compute_operator.GceBaseOperator

Starts an instance in Google Compute Engine.

See also

For more information on how to use this operator, take a look at the guide: GceInstanceStartOperator

Parameters:
  • zone (str) – Google Cloud Platform zone where the instance exists.
  • resource_id (str) – Name of the Compute Engine instance resource.
  • project_id (str) – Optional, Google Cloud Platform Project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used.
  • gcp_conn_id (str) – Optional, The connection ID used to connect to Google Cloud Platform. Defaults to ‘google_cloud_default’.
  • api_version (str) – Optional, API version used (for example v1 - or beta). Defaults to v1.
  • validate_body – Optional, If set to False, body validation is not performed. Defaults to False.
class airflow.contrib.operators.gcp_compute_operator.GceInstanceStopOperator(**kwargs)[source]

Bases: airflow.contrib.operators.gcp_compute_operator.GceBaseOperator

Stops an instance in Google Compute Engine.

See also

For more information on how to use this operator, take a look at the guide: GceInstanceStopOperator

Parameters:
  • zone (str) – Google Cloud Platform zone where the instance exists.
  • resource_id (str) – Name of the Compute Engine instance resource.
  • project_id (str) – Optional, Google Cloud Platform Project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used.
  • gcp_conn_id (str) – Optional, The connection ID used to connect to Google Cloud Platform. Defaults to ‘google_cloud_default’.
  • api_version (str) – Optional, API version used (for example v1 - or beta). Defaults to v1.
  • validate_body – Optional, If set to False, body validation is not performed. Defaults to False.
class airflow.contrib.operators.gcp_compute_operator.GceInstanceTemplateCopyOperator(**kwargs)[source]

Bases: airflow.contrib.operators.gcp_compute_operator.GceBaseOperator

Copies the instance template, applying specified changes.

See also

For more information on how to use this operator, take a look at the guide: GceInstanceTemplateCopyOperator

Parameters:
  • resource_id (str) – Name of the Instance Template
  • body_patch (dict) – Patch to the body of instanceTemplates object following rfc7386 PATCH semantics. The body_patch content follows https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates Name field is required as we need to rename the template, all the other fields are optional. It is important to follow PATCH semantics - arrays are replaced fully, so if you need to update an array you should provide the whole target array as patch element.
  • project_id (str) – Optional, Google Cloud Platform Project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used.
  • request_id (str) – Optional, unique request_id that you might add to achieve full idempotence (for example when client call times out repeating the request with the same request id will not create a new instance template again). It should be in UUID format as defined in RFC 4122.
  • gcp_conn_id (str) – Optional, The connection ID used to connect to Google Cloud Platform. Defaults to ‘google_cloud_default’.
  • api_version (str) – Optional, API version used (for example v1 - or beta). Defaults to v1.
  • validate_body (bool) – Optional, If set to False, body validation is not performed. Defaults to False.
class airflow.contrib.operators.gcp_compute_operator.GceSetMachineTypeOperator(**kwargs)[source]

Bases: airflow.contrib.operators.gcp_compute_operator.GceBaseOperator

Changes the machine type for a stopped instance to the machine type specified in
the request.

See also

For more information on how to use this operator, take a look at the guide: GceSetMachineTypeOperator

Parameters:
  • zone (str) – Google Cloud Platform zone where the instance exists.
  • resource_id (str) – Name of the Compute Engine instance resource.
  • body (dict) – Body required by the Compute Engine setMachineType API, as described in https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType#request-body
  • project_id (str) – Optional, Google Cloud Platform Project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used.
  • gcp_conn_id (str) – Optional, The connection ID used to connect to Google Cloud Platform. Defaults to ‘google_cloud_default’.
  • api_version (str) – Optional, API version used (for example v1 - or beta). Defaults to v1.
  • validate_body (bool) – Optional, If set to False, body validation is not performed. Defaults to False.
class airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Deletes the specified function from Google Cloud Functions.

See also

For more information on how to use this operator, take a look at the guide: GcfFunctionDeleteOperator

Parameters:
  • name (str) – A fully-qualified function name, matching the pattern: ^projects/[^/]+/locations/[^/]+/functions/[^/]+$
  • gcp_conn_id (str) – The connection ID to use to connect to Google Cloud Platform.
  • api_version (str) – API version used (for example v1 or v1beta1).
class airflow.contrib.operators.gcp_function_operator.GcfFunctionDeployOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Creates a function in Google Cloud Functions. If a function with this name already exists, it will be updated.

See also

For more information on how to use this operator, take a look at the guide: GcfFunctionDeployOperator

Parameters:
  • location (str) – Google Cloud Platform region where the function should be created.
  • body (dict or google.cloud.functions.v1.CloudFunction) – Body of the Cloud Functions definition. The body must be a Cloud Functions dictionary as described in: https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions . Different API versions require different variants of the Cloud Functions dictionary.
  • project_id (str) – (Optional) Google Cloud Platform project ID where the function should be created.
  • gcp_conn_id (str) – (Optional) The connection ID used to connect to Google Cloud Platform - default ‘google_cloud_default’.
  • api_version (str) – (Optional) API version used (for example v1 - default - or v1beta1).
  • zip_path (str) – Path to zip file containing source code of the function. If the path is set, the sourceUploadUrl should not be specified in the body or it should be empty. Then the zip file will be uploaded using the upload URL generated via generateUploadUrl from the Cloud Functions API.
  • validate_body (bool) – If set to False, body validation is not performed.
class airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageBucketCreateAclEntryOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Creates a new ACL entry on the specified bucket.

See also

For more information on how to use this operator, take a look at the guide: GoogleCloudStorageBucketCreateAclEntryOperator

Parameters:
  • bucket (str) – Name of a bucket.
  • entity (str) – The entity holding the permission, in one of the following forms: user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers
  • role (str) – The access permission for the entity. Acceptable values are: “OWNER”, “READER”, “WRITER”.
  • user_project (str) – (Optional) The project to be billed for this request. Required for Requester Pays buckets.
  • google_cloud_storage_conn_id (str) – The connection ID to use when connecting to Google Cloud Storage.
class airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Creates a new bucket. Google Cloud Storage uses a flat namespace, so you can’t create a bucket with a name that is already in use.

See also

For more information, see Bucket Naming Guidelines: https://cloud.google.com/storage/docs/bucketnaming.html#requirements

Parameters:
  • bucket_name (str) – The name of the bucket. (templated)
  • storage_class (str) –

    This defines how objects in the bucket are stored and determines the SLA and the cost of storage (templated). Values include

    • MULTI_REGIONAL
    • REGIONAL
    • STANDARD
    • NEARLINE
    • COLDLINE.

    If this value is not specified when the bucket is created, it will default to STANDARD.

  • location (str) –

    The location of the bucket. (templated) Object data for objects in the bucket resides in physical storage within this region. Defaults to US.

  • project_id (str) – The ID of the GCP Project. (templated)
  • labels (dict) – User-provided labels, in key/value pairs.
  • google_cloud_storage_conn_id (str) – The connection ID to use when connecting to Google cloud storage.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.

:Example:: The following Operator would create a new bucket test-bucket with MULTI_REGIONAL storage class in EU region

CreateBucket = GoogleCloudStorageCreateBucketOperator(
    task_id='CreateNewBucket',
    bucket_name='test-bucket',
    storage_class='MULTI_REGIONAL',
    location='EU',
    labels={'env': 'dev', 'team': 'airflow'},
    google_cloud_storage_conn_id='airflow-service-account'
)
class airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Downloads a file from Google Cloud Storage.

Parameters:
  • bucket (str) – The Google cloud storage bucket where the object is. (templated)
  • object (str) – The name of the object to download in the Google cloud storage bucket. (templated)
  • filename (str) – The file path on the local file system (where the operator is being executed) that the file should be downloaded to. (templated) If no filename passed, the downloaded data will not be stored on the local file system.
  • store_to_xcom_key (str) – If this param is set, the operator will push the contents of the downloaded file to XCom with the key set in this parameter. If not set, the downloaded data will not be pushed to XCom. (templated)
  • google_cloud_storage_conn_id (str) – The connection ID to use when connecting to Google cloud storage.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
class airflow.contrib.operators.gcs_list_operator.GoogleCloudStorageListOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

List all objects from the bucket with the give string prefix and delimiter in name.

This operator returns a python list with the name of objects which can be used by
xcom in the downstream task.
Parameters:
  • bucket (str) – The Google cloud storage bucket to find the objects. (templated)
  • prefix (str) – Prefix string which filters objects whose name begin with this prefix. (templated)
  • delimiter (str) – The delimiter by which you want to filter the objects. (templated) For e.g to lists the CSV files from in a directory in GCS you would use delimiter=’.csv’.
  • google_cloud_storage_conn_id (str) – The connection ID to use when connecting to Google cloud storage.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
Example:

The following Operator would list all the Avro files from sales/sales-2017 folder in data bucket.

GCS_Files = GoogleCloudStorageListOperator(
    task_id='GCS_Files',
    bucket='data',
    prefix='sales/sales-2017/',
    delimiter='.avro',
    google_cloud_storage_conn_id=google_cloud_conn_id
)
class airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageObjectCreateAclEntryOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Creates a new ACL entry on the specified object.

See also

For more information on how to use this operator, take a look at the guide: GoogleCloudStorageObjectCreateAclEntryOperator

Parameters:
  • bucket (str) – Name of a bucket.
  • object_name (str) – Name of the object. For information about how to URL encode object names to be path safe, see: https://cloud.google.com/storage/docs/json_api/#encoding
  • entity (str) – The entity holding the permission, in one of the following forms: user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers
  • role (str) – The access permission for the entity. Acceptable values are: “OWNER”, “READER”.
  • generation (str) – (Optional) If present, selects a specific revision of this object (as opposed to the latest version, the default).
  • user_project (str) – (Optional) The project to be billed for this request. Required for Requester Pays buckets.
  • google_cloud_storage_conn_id (str) – The connection ID to use when connecting to Google Cloud Storage.
class airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Loads files from Google cloud storage into BigQuery.

The schema to be used for the BigQuery table may be specified in one of two ways. You may either directly pass the schema fields in, or you may point the operator to a Google cloud storage object name. The object in Google cloud storage must be a JSON file with the schema fields in it.

See also

For more information on how to use this operator, take a look at the guide: GoogleCloudStorageToBigQueryOperator

Parameters:
  • bucket (str) – The bucket to load from. (templated)
  • source_objects (list[str]) – List of Google cloud storage URIs to load from. (templated) If source_format is ‘DATASTORE_BACKUP’, the list must only contain a single URI.
  • destination_project_dataset_table (str) – The dotted (<project>.)<dataset>.<table> BigQuery table to load data into. If <project> is not included, project will be the project defined in the connection json. (templated)
  • schema_fields (list) – If set, the schema field list as defined here: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load Should not be set when source_format is ‘DATASTORE_BACKUP’.
  • schema_object (str) – If set, a GCS object path pointing to a .json file that contains the schema for the table. (templated)
  • source_format (str) – File format to export.
  • compression (str) – [Optional] The compression type of the data source. Possible values include GZIP and NONE. The default value is NONE. This setting is ignored for Google Cloud Bigtable, Google Cloud Datastore backups and Avro formats.
  • create_disposition (str) – The create disposition if the table doesn’t exist.
  • skip_leading_rows (int) – Number of rows to skip when loading from a CSV.
  • write_disposition (str) – The write disposition if the table already exists.
  • field_delimiter (str) – The delimiter to use when loading from a CSV.
  • max_bad_records (int) – The maximum number of bad records that BigQuery can ignore when running the job.
  • quote_character (str) – The value that is used to quote data sections in a CSV file.
  • ignore_unknown_values (bool) – [Optional] Indicates if BigQuery should allow extra values that are not represented in the table schema. If true, the extra values are ignored. If false, records with extra columns are treated as bad records, and if there are too many bad records, an invalid error is returned in the job result.
  • allow_quoted_newlines (bool) – Whether to allow quoted newlines (true) or not (false).
  • allow_jagged_rows (bool) – Accept rows that are missing trailing optional columns. The missing values are treated as nulls. If false, records with missing trailing columns are treated as bad records, and if there are too many bad records, an invalid error is returned in the job result. Only applicable to CSV, ignored for other formats.
  • max_id_key (str) – If set, the name of a column in the BigQuery table that’s to be loaded. This will be used to select the MAX value from BigQuery after the load occurs. The results will be returned by the execute() command, which in turn gets stored in XCom for future operators to use. This can be helpful with incremental loads–during future executions, you can pick up from the max ID.
  • bigquery_conn_id (str) – Reference to a specific BigQuery hook.
  • google_cloud_storage_conn_id (str) – Reference to a specific Google cloud storage hook.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • schema_update_options (list) – Allows the schema of the destination table to be updated as a side effect of the load job.
  • src_fmt_configs (dict) – configure optional fields specific to the source format
  • external_table (bool) – Flag to specify if the destination table should be a BigQuery external table. Default Value is False.
  • time_partitioning (dict) – configure optional time partitioning fields i.e. partition by field, type and expiration as per API specifications. Note that ‘field’ is not available in concurrency with dataset.table$partition.
  • cluster_fields (list[str]) – Request that the result of this load be stored sorted by one or more columns. This is only available in conjunction with time_partitioning. The order of columns given determines the sort order. Not applicable for external tables.
class airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Copies objects from a bucket to another, with renaming if requested.

Parameters:
  • source_bucket (str) – The source Google cloud storage bucket where the object is. (templated)
  • source_object (str) – The source name of the object to copy in the Google cloud storage bucket. (templated) You can use only one wildcard for objects (filenames) within your bucket. The wildcard can appear inside the object name or at the end of the object name. Appending a wildcard to the bucket name is unsupported.
  • destination_bucket (str) – The destination Google cloud storage bucket where the object should be. (templated)
  • destination_object (str) – The destination name of the object in the destination Google cloud storage bucket. (templated) If a wildcard is supplied in the source_object argument, this is the prefix that will be prepended to the final destination objects’ paths. Note that the source path’s part before the wildcard will be removed; if it needs to be retained it should be appended to destination_object. For example, with prefix foo/* and destination_object blah/, the file foo/baz will be copied to blah/baz; to retain the prefix write the destination_object as e.g. blah/foo, in which case the copied file will be named blah/foo/baz.
  • move_object (bool) – When move object is True, the object is moved instead of copied to the new location. This is the equivalent of a mv command as opposed to a cp command.
  • google_cloud_storage_conn_id (str) – The connection ID to use when connecting to Google cloud storage.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • last_modified_time (datetime.datetime) – When specified, if the object(s) were modified after last_modified_time, they will be copied/moved. If tzinfo has not been set, UTC will be assumed.
Example:

The following Operator would copy a single file named sales/sales-2017/january.avro in the data bucket to the file named copied_sales/2017/january-backup.avro in the data_backup bucket

copy_single_file = GoogleCloudStorageToGoogleCloudStorageOperator(
    task_id='copy_single_file',
    source_bucket='data',
    source_object='sales/sales-2017/january.avro',
    destination_bucket='data_backup',
    destination_object='copied_sales/2017/january-backup.avro',
    google_cloud_storage_conn_id=google_cloud_conn_id
)

The following Operator would copy all the Avro files from sales/sales-2017 folder (i.e. with names starting with that prefix) in data bucket to the copied_sales/2017 folder in the data_backup bucket.

copy_files = GoogleCloudStorageToGoogleCloudStorageOperator(
    task_id='copy_files',
    source_bucket='data',
    source_object='sales/sales-2017/*.avro',
    destination_bucket='data_backup',
    destination_object='copied_sales/2017/',
    google_cloud_storage_conn_id=google_cloud_conn_id
)

The following Operator would move all the Avro files from sales/sales-2017 folder (i.e. with names starting with that prefix) in data bucket to the same folder in the data_backup bucket, deleting the original files in the process.

move_files = GoogleCloudStorageToGoogleCloudStorageOperator(
    task_id='move_files',
    source_bucket='data',
    source_object='sales/sales-2017/*.avro',
    destination_bucket='data_backup',
    move_object=True,
    google_cloud_storage_conn_id=google_cloud_conn_id
)
class airflow.contrib.operators.gcs_to_gcs_transfer_operator.GoogleCloudStorageToGoogleCloudStorageTransferOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Copies objects from a bucket to another using the GCP Storage Transfer Service.

Parameters:

Example:

gcs_to_gcs_transfer_op = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
     task_id='gcs_to_gcs_transfer_example',
     source_bucket='my-source-bucket',
     destination_bucket='my-destination-bucket',
     project_id='my-gcp-project',
     dag=my_dag)
class airflow.contrib.operators.gcs_to_s3.GoogleCloudStorageToS3Operator(**kwargs)[source]

Bases: airflow.contrib.operators.gcs_list_operator.GoogleCloudStorageListOperator

Synchronizes a Google Cloud Storage bucket with an S3 bucket.

Parameters:
  • bucket (str) – The Google Cloud Storage bucket to find the objects. (templated)
  • prefix (str) – Prefix string which filters objects whose name begin with this prefix. (templated)
  • delimiter (str) – The delimiter by which you want to filter the objects. (templated) For e.g to lists the CSV files from in a directory in GCS you would use delimiter=’.csv’.
  • google_cloud_storage_conn_id (str) – The connection ID to use when connecting to Google Cloud Storage.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • dest_aws_conn_id (str) – The destination S3 connection
  • dest_s3_key (str) – The base S3 key to be used to store the files. (templated)
  • dest_verify (bool or str) –

    Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values:

    • False: do not validate SSL certificates. SSL will still be used
      (unless use_ssl is False), but SSL certificates will not be verified.
    • path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
      You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.
class airflow.contrib.operators.hipchat_operator.HipChatAPIOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Base HipChat Operator. All derived HipChat operators reference from HipChat’s official REST API documentation at https://www.hipchat.com/docs/apiv2. Before using any HipChat API operators you need to get an authentication token at https://www.hipchat.com/docs/apiv2/auth. In the future additional HipChat operators will be derived from this class as well.

Parameters:
  • token (str) – HipChat REST API authentication token
  • base_url (str) – HipChat REST API base url.
prepare_request()[source]

Used by the execute function. Set the request method, url, and body of HipChat’s REST API call. Override in child class. Each HipChatAPI child operator is responsible for having a prepare_request method call which sets self.method, self.url, and self.body.

class airflow.contrib.operators.hipchat_operator.HipChatAPISendRoomNotificationOperator(**kwargs)[source]

Bases: airflow.contrib.operators.hipchat_operator.HipChatAPIOperator

Send notification to a specific HipChat room. More info: https://www.hipchat.com/docs/apiv2/method/send_room_notification

Parameters:
  • room_id (str) – Room in which to send notification on HipChat. (templated)
  • message (str) – The message body. (templated)
  • frm (str) – Label to be shown in addition to sender’s name
  • message_format (str) – How the notification is rendered: html or text
  • color (str) – Background color of the msg: yellow, green, red, purple, gray, or random
  • attach_to (str) – The message id to attach this notification to
  • notify (bool) – Whether this message should trigger a user notification
  • card (dict) – HipChat-defined card object
prepare_request()[source]

Used by the execute function. Set the request method, url, and body of HipChat’s REST API call. Override in child class. Each HipChatAPI child operator is responsible for having a prepare_request method call which sets self.method, self.url, and self.body.

class airflow.contrib.operators.hive_to_dynamodb.HiveToDynamoDBTransferOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from Hive to DynamoDB, note that for now the data is loaded into memory before being pushed to DynamoDB, so this operator should be used for smallish amount of data.

Parameters:
  • sql (str) – SQL query to execute against the hive database. (templated)
  • table_name (str) – target DynamoDB table
  • table_keys (list) – partition key and sort key
  • pre_process (function) – implement pre-processing of source data
  • pre_process_args (list) – list of pre_process function arguments
  • pre_process_kwargs (dict) – dict of pre_process function arguments
  • region_name (str) – aws region name (example: us-east-1)
  • schema (str) – hive database schema
  • hiveserver2_conn_id (str) – source hive connection
  • aws_conn_id (str) – aws connection
class airflow.contrib.operators.imap_attachment_to_s3_operator.ImapAttachmentToS3Operator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Transfers a mail attachment from a mail server into s3 bucket.

Parameters:
  • imap_attachment_name (str) – The file name of the mail attachment that you want to transfer.
  • s3_key (str) – The destination file name in the s3 bucket for the attachment.
  • imap_mail_folder (str) – The folder on the mail server to look for the attachment.
  • imap_check_regex (bool) – If set checks the imap_attachment_name for a regular expression.
  • s3_overwrite (bool) – If set overwrites the s3 key if already exists.
  • imap_conn_id (str) – The reference to the connection details of the mail server.
  • s3_conn_id (str) – The reference to the s3 connection details.
execute(context)[source]

This function executes the transfer from the email server (via imap) into s3.

Parameters:context (dict) – The context while executing.
class airflow.contrib.operators.jenkins_job_trigger_operator.JenkinsJobTriggerOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Trigger a Jenkins Job and monitor it’s execution. This operator depend on python-jenkins library, version >= 0.4.15 to communicate with jenkins server. You’ll also need to configure a Jenkins connection in the connections screen.

Parameters:
  • jenkins_connection_id (str) – The jenkins connection to use for this job
  • job_name (str) – The name of the job to trigger
  • parameters (str) – The parameters block to provide to jenkins. (templated)
  • sleep_time (int) – How long will the operator sleep between each status request for the job (min 1, default 10)
  • max_try_before_job_appears (int) – The maximum number of requests to make while waiting for the job to appears on jenkins server (default 10)
build_job(jenkins_server)[source]

This function makes an API call to Jenkins to trigger a build for ‘job_name’ It returned a dict with 2 keys : body and headers. headers contains also a dict-like object which can be queried to get the location to poll in the queue.

Parameters:jenkins_server – The jenkins server where the job should be triggered
Returns:Dict containing the response body (key body) and the headers coming along (headers)
poll_job_in_queue(location, jenkins_server)[source]

This method poll the jenkins queue until the job is executed. When we trigger a job through an API call, the job is first put in the queue without having a build number assigned. Thus we have to wait the job exit the queue to know its build number. To do so, we have to add /api/json (or /api/xml) to the location returned by the build_job call and poll this file. When a ‘executable’ block appears in the json, it means the job execution started and the field ‘number’ then contains the build number.

Parameters:
  • location – Location to poll, returned in the header of the build_job call
  • jenkins_server – The jenkins server to poll
Returns:

The build_number corresponding to the triggered job

class airflow.contrib.operators.jira_operator.JiraOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

JiraOperator to interact and perform action on Jira issue tracking system. This operator is designed to use Jira Python SDK: http://jira.readthedocs.io

Parameters:
  • jira_conn_id (str) – reference to a pre-defined Jira Connection
  • jira_method (str) – method name from Jira Python SDK to be called
  • jira_method_args (dict) – required method parameters for the jira_method. (templated)
  • result_processor (function) – function to further process the response from Jira
  • get_jira_resource_method (function) – function or operator to get jira resource on which the provided jira_method will be executed
class airflow.contrib.operators.kubernetes_pod_operator.KubernetesPodOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Execute a task in a Kubernetes Pod

Parameters:
  • image (str) – Docker image you wish to launch. Defaults to dockerhub.io, but fully qualified URLS will point to custom repositories
  • namespace (str) – the namespace to run within kubernetes
  • cmds (list[str]) – entrypoint of the container. (templated) The docker images’s entrypoint is used if this is not provide.
  • arguments (list[str]) – arguments of the entrypoint. (templated) The docker image’s CMD is used if this is not provided.
  • image_pull_policy (str) – Specify a policy to cache or always pull an image
  • image_pull_secrets (str) – Any image pull secrets to be given to the pod. If more than one secret is required, provide a comma separated list: secret_a,secret_b
  • volume_mounts (list[airflow.contrib.kubernetes.volume_mount.VolumeMount]) – volumeMounts for launched pod
  • volumes (list[airflow.contrib.kubernetes.volume.Volume]) – volumes for launched pod. Includes ConfigMaps and PersistentVolumes
  • labels (dict) – labels to apply to the Pod
  • startup_timeout_seconds (int) – timeout in seconds to startup the pod
  • name (str) – name of the task you want to run, will be used to generate a pod id
  • env_vars (dict) – Environment variables initialized in the container. (templated)
  • secrets (list[airflow.contrib.kubernetes.secret.Secret]) – Kubernetes secrets to inject in the container, They can be exposed as environment vars or files in a volume.
  • in_cluster (bool) – run kubernetes client with in_cluster configuration
  • cluster_context (str) – context that points to kubernetes cluster. Ignored when in_cluster is True. If None, current-context is used.
  • get_logs (bool) – get the stdout of the container as logs of the tasks
  • affinity (dict) – A dict containing a group of affinity scheduling rules
  • node_selectors (dict) – A dict containing a group of scheduling rules
  • config_file (str) – The path to the Kubernetes config file
  • do_xcom_push (bool) – If True, the content of the file /airflow/xcom/return.json in the container will also be pushed to an XCom when the container completes.
  • hostnetwork (bool) – If True enable host networking on the pod
  • tolerations (list tolerations) – A list of kubernetes tolerations
class airflow.contrib.operators.mlengine_operator.MLEngineBatchPredictionOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Start a Google Cloud ML Engine prediction job.

NOTE: For model origin, users should consider exactly one from the three options below:

  1. Populate uri field only, which should be a GCS location that points to a tensorflow savedModel directory.
  2. Populate model_name field only, which refers to an existing model, and the default version of the model will be used.
  3. Populate both model_name and version_name fields, which refers to a specific version of a specific model.

In options 2 and 3, both model and version name should contain the minimal identifier. For instance, call:

MLEngineBatchPredictionOperator(
    ...,
    model_name='my_model',
    version_name='my_version',
    ...)

if the desired model version is projects/my_project/models/my_model/versions/my_version.

See https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs for further documentation on the parameters.

Parameters:
  • project_id (str) – The Google Cloud project name where the prediction job is submitted. (templated)
  • job_id (str) – A unique id for the prediction job on Google Cloud ML Engine. (templated)
  • data_format (str) – The format of the input data. It will default to ‘DATA_FORMAT_UNSPECIFIED’ if is not provided or is not one of [“TEXT”, “TF_RECORD”, “TF_RECORD_GZIP”].
  • input_paths (list[str]) – A list of GCS paths of input data for batch prediction. Accepting wildcard operator *, but only at the end. (templated)
  • output_path (str) – The GCS path where the prediction results are written to. (templated)
  • region (str) – The Google Compute Engine region to run the prediction job in. (templated)
  • model_name (str) – The Google Cloud ML Engine model to use for prediction. If version_name is not provided, the default version of this model will be used. Should not be None if version_name is provided. Should be None if uri is provided. (templated)
  • version_name (str) – The Google Cloud ML Engine model version to use for prediction. Should be None if uri is provided. (templated)
  • uri (str) – The GCS path of the saved model to use for prediction. Should be None if model_name is provided. It should be a GCS path pointing to a tensorflow SavedModel. (templated)
  • max_worker_count (int) – The maximum number of workers to be used for parallel processing. Defaults to 10 if not specified.
  • runtime_version (str) – The Google Cloud ML Engine runtime version to use for batch prediction.
  • gcp_conn_id (str) – The connection ID used for connection to Google Cloud Platform.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
Raises:

ValueError: if a unique model/version origin cannot be determined.

class airflow.contrib.operators.mlengine_operator.MLEngineModelOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Operator for managing a Google Cloud ML Engine model.

Parameters:
  • project_id (str) – The Google Cloud project name to which MLEngine model belongs. (templated)
  • model (dict) –

    A dictionary containing the information about the model. If the operation is create, then the model parameter should contain all the information about this model such as name.

    If the operation is get, the model parameter should contain the name of the model.

  • operation (str) –

    The operation to perform. Available operations are:

    • create: Creates a new model as provided by the model parameter.
    • get: Gets a particular model where the name is specified in model.
  • gcp_conn_id (str) – The connection ID to use when fetching connection info.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
class airflow.contrib.operators.mlengine_operator.MLEngineTrainingOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Operator for launching a MLEngine training job.

Parameters:
  • project_id (str) – The Google Cloud project name within which MLEngine training job should run (templated).
  • job_id (str) – A unique templated id for the submitted Google MLEngine training job. (templated)
  • package_uris (str) – A list of package locations for MLEngine training job, which should include the main training program + any additional dependencies. (templated)
  • training_python_module (str) – The Python module name to run within MLEngine training job after installing ‘package_uris’ packages. (templated)
  • training_args (str) – A list of templated command line arguments to pass to the MLEngine training program. (templated)
  • region (str) – The Google Compute Engine region to run the MLEngine training job in (templated).
  • scale_tier (str) – Resource tier for MLEngine training job. (templated)
  • master_type (str) – Cloud ML Engine machine name. Must be set when scale_tier is CUSTOM. (templated)
  • runtime_version (str) – The Google Cloud ML runtime version to use for training. (templated)
  • python_version (str) – The version of Python used in training. (templated)
  • job_dir (str) – A Google Cloud Storage path in which to store training outputs and other data needed for training. (templated)
  • gcp_conn_id (str) – The connection ID to use when fetching connection info.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • mode (str) – Can be one of ‘DRY_RUN’/’CLOUD’. In ‘DRY_RUN’ mode, no real training job will be launched, but the MLEngine training job request will be printed out. In ‘CLOUD’ mode, a real MLEngine training job creation request will be issued.
class airflow.contrib.operators.mlengine_operator.MLEngineVersionOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Operator for managing a Google Cloud ML Engine version.

Parameters:
  • project_id (str) – The Google Cloud project name to which MLEngine model belongs.
  • model_name (str) – The name of the Google Cloud ML Engine model that the version belongs to. (templated)
  • version_name (str) – A name to use for the version being operated upon. If not None and the version argument is None or does not have a value for the name key, then this will be populated in the payload for the name key. (templated)
  • version (dict) – A dictionary containing the information about the version. If the operation is create, version should contain all the information about this version such as name, and deploymentUrl. If the operation is get or delete, the version parameter should contain the name of the version. If it is None, the only operation possible would be list. (templated)
  • operation (str) –

    The operation to perform. Available operations are:

    • create: Creates a new version in the model specified by model_name, in which case the version parameter should contain all the information to create that version (e.g. name, deploymentUrl).
    • get: Gets full information of a particular version in the model specified by model_name. The name of the version should be specified in the version parameter.
    • list: Lists all available versions of the model specified by model_name.
    • delete: Deletes the version specified in version parameter from the model specified by model_name). The name of the version should be specified in the version parameter.
  • gcp_conn_id (str) – The connection ID to use when fetching connection info.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
class airflow.contrib.operators.mongo_to_s3.MongoToS3Operator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Mongo -> S3

A more specific baseOperator meant to move data from mongo via pymongo to s3 via boto

things to note
.execute() is written to depend on .transform() .transform() is meant to be extended by child classes to perform transformations unique to those operators needs
execute(context)[source]

Executed by task_instance at runtime

static transform(docs)[source]
Processes pyMongo cursor and returns an iterable with each element being
a JSON serializable dictionary

Base transform() assumes no processing is needed ie. docs is a pyMongo cursor of documents and cursor just needs to be passed through

Override this method for custom transformations

class airflow.contrib.operators.mysql_to_gcs.MySqlToGoogleCloudStorageOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Copy data from MySQL to Google cloud storage in JSON format.

Parameters:
  • sql (str) – The SQL to execute on the MySQL table.
  • bucket (str) – The bucket to upload to.
  • filename (str) – The filename to use as the object name when uploading to Google cloud storage. A {} should be specified in the filename to allow the operator to inject file numbers in cases where the file is split due to size.
  • schema_filename (str) – If set, the filename to use as the object name when uploading a .json file containing the BigQuery schema fields for the table that was dumped from MySQL.
  • approx_max_file_size_bytes (long) – This operator supports the ability to split large table dumps into multiple files (see notes in the filenamed param docs above). Google cloud storage allows for files to be a maximum of 4GB. This param allows developers to specify the file size of the splits.
  • mysql_conn_id (str) – Reference to a specific MySQL hook.
  • google_cloud_storage_conn_id (str) – Reference to a specific Google cloud storage hook.
  • schema (str or list) – The schema to use, if any. Should be a list of dict or a str. Pass a string if using Jinja template, otherwise, pass a list of dict. Examples could be seen: https://cloud.google.com/bigquery/docs /schemas#specifying_a_json_schema_file
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
classmethod type_map(mysql_type)[source]

Helper function that maps from MySQL fields to BigQuery fields. Used when a schema_filename is set.

class airflow.contrib.operators.oracle_to_azure_data_lake_transfer.OracleToAzureDataLakeTransfer(**kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from Oracle to Azure Data Lake. The operator runs the query against Oracle and stores the file locally before loading it into Azure Data Lake.

Parameters:
  • filename (str) – file name to be used by the csv file.
  • azure_data_lake_conn_id (str) – destination azure data lake connection.
  • azure_data_lake_path (str) – destination path in azure data lake to put the file.
  • oracle_conn_id (str) – source Oracle connection.
  • sql (str) – SQL query to execute against the Oracle database. (templated)
  • sql_params (str) – Parameters to use in sql query. (templated)
  • delimiter (str) – field delimiter in the file.
  • encoding (str) – encoding type for the file.
  • quotechar (str) – Character to use in quoting.
  • quoting (str) – Quoting strategy. See unicodecsv quoting for more information.
class airflow.contrib.operators.oracle_to_oracle_transfer.OracleToOracleTransfer(**kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from Oracle to Oracle.

Parameters:
  • oracle_destination_conn_id (str) – destination Oracle connection.
  • destination_table (str) – destination table to insert rows.
  • oracle_source_conn_id (str) – source Oracle connection.
  • source_sql (str) – SQL query to execute against the source Oracle database. (templated)
  • source_sql_params (dict) – Parameters to use in sql query. (templated)
  • rows_chunk (int) – number of rows per chunk to commit.
class airflow.contrib.operators.postgres_to_gcs_operator.PostgresToGoogleCloudStorageOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Copy data from Postgres to Google Cloud Storage in JSON format.

classmethod convert_types(value)[source]

Takes a value from Postgres, and converts it to a value that’s safe for JSON/Google Cloud Storage/BigQuery. Dates are converted to UTC seconds. Decimals are converted to floats. Times are converted to seconds.

classmethod type_map(postgres_type)[source]

Helper function that maps from Postgres fields to BigQuery fields. Used when a schema_filename is set.

class airflow.contrib.operators.pubsub_operator.PubSubPublishOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Publish messages to a PubSub topic.

Each Task publishes all provided messages to the same topic in a single GCP project. If the topic does not exist, this task will fail.

from base64 import b64encode as b64e

m1 = {'data': b64e('Hello, World!'),
      'attributes': {'type': 'greeting'}
     }
m2 = {'data': b64e('Knock, knock')}
m3 = {'attributes': {'foo': ''}}

t1 = PubSubPublishOperator(
    project='my-project',topic='my_topic',
    messages=[m1, m2, m3],
    create_topic=True,
    dag=dag)

project , topic, and messages are templated so you can use variables in them.

class airflow.contrib.operators.pubsub_operator.PubSubSubscriptionCreateOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Create a PubSub subscription.

By default, the subscription will be created in topic_project. If subscription_project is specified and the GCP credentials allow, the Subscription can be created in a different project from its topic.

By default, if the subscription already exists, this operator will not cause the DAG to fail. However, the topic must exist in the project.

with DAG('successful DAG') as dag:
    (
        dag
        >> PubSubSubscriptionCreateOperator(
            topic_project='my-project', topic='my-topic',
            subscription='my-subscription')
        >> PubSubSubscriptionCreateOperator(
            topic_project='my-project', topic='my-topic',
            subscription='my-subscription')
    )

The operator can be configured to fail if the subscription already exists.

with DAG('failing DAG') as dag:
    (
        dag
        >> PubSubSubscriptionCreateOperator(
            topic_project='my-project', topic='my-topic',
            subscription='my-subscription')
        >> PubSubSubscriptionCreateOperator(
            topic_project='my-project', topic='my-topic',
            subscription='my-subscription', fail_if_exists=True)
    )

Finally, subscription is not required. If not passed, the operator will generated a universally unique identifier for the subscription’s name.

with DAG('DAG') as dag:
    (
        dag >> PubSubSubscriptionCreateOperator(
            topic_project='my-project', topic='my-topic')
    )

topic_project, topic, subscription, and subscription are templated so you can use variables in them.

class airflow.contrib.operators.pubsub_operator.PubSubSubscriptionDeleteOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Delete a PubSub subscription.

By default, if the subscription does not exist, this operator will not cause the DAG to fail.

with DAG('successful DAG') as dag:
    (
        dag
        >> PubSubSubscriptionDeleteOperator(project='my-project',
                                            subscription='non-existing')
    )

The operator can be configured to fail if the subscription already exists.

with DAG('failing DAG') as dag:
    (
        dag
        >> PubSubSubscriptionDeleteOperator(
             project='my-project', subscription='non-existing',
             fail_if_not_exists=True)
    )

project, and subscription are templated so you can use variables in them.

class airflow.contrib.operators.pubsub_operator.PubSubTopicCreateOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Create a PubSub topic.

By default, if the topic already exists, this operator will not cause the DAG to fail.

with DAG('successful DAG') as dag:
    (
        dag
        >> PubSubTopicCreateOperator(project='my-project',
                                     topic='my_new_topic')
        >> PubSubTopicCreateOperator(project='my-project',
                                     topic='my_new_topic')
    )

The operator can be configured to fail if the topic already exists.

with DAG('failing DAG') as dag:
    (
        dag
        >> PubSubTopicCreateOperator(project='my-project',
                                     topic='my_new_topic')
        >> PubSubTopicCreateOperator(project='my-project',
                                     topic='my_new_topic',
                                     fail_if_exists=True)
    )

Both project and topic are templated so you can use variables in them.

class airflow.contrib.operators.pubsub_operator.PubSubTopicDeleteOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Delete a PubSub topic.

By default, if the topic does not exist, this operator will not cause the DAG to fail.

with DAG('successful DAG') as dag:
    (
        dag
        >> PubSubTopicDeleteOperator(project='my-project',
                                     topic='non_existing_topic')
    )

The operator can be configured to fail if the topic does not exist.

with DAG('failing DAG') as dag:
    (
        dag
        >> PubSubTopicCreateOperator(project='my-project',
                                     topic='non_existing_topic',
                                     fail_if_not_exists=True)
    )

Both project and topic are templated so you can use variables in them.

class airflow.contrib.operators.qubole_check_operator.QuboleCheckOperator(**kwargs)[source]

Bases: airflow.operators.check_operator.CheckOperator, airflow.contrib.operators.qubole_operator.QuboleOperator

Performs checks against Qubole Commands. QuboleCheckOperator expects a command that will be executed on QDS. By default, each value on first row of the result of this Qubole Command is evaluated using python bool casting. If any of the values return False, the check is failed and errors out.

Note that Python bool casting evals the following as False:

  • False
  • 0
  • Empty string ("")
  • Empty list ([])
  • Empty dictionary or set ({})

Given a query like SELECT COUNT(*) FROM foo, it will fail only if the count == 0. You can craft much more complex query that could, for instance, check that the table has the same number of rows as the source table upstream, or that the count of today’s partition is greater than yesterday’s partition, or that a set of metrics are less than 3 standard deviation for the 7 day average.

This operator can be used as a data quality check in your pipeline, and depending on where you put it in your DAG, you have the choice to stop the critical path, preventing from publishing dubious data, or on the side and receive email alerts without stopping the progress of the DAG.

Parameters:qubole_conn_id (str) – Connection id which consists of qds auth_token

kwargs:

Arguments specific to Qubole command can be referred from QuboleOperator docs.

results_parser_callable:
 This is an optional parameter to extend the flexibility of parsing the results of Qubole command to the users. This is a python callable which can hold the logic to parse list of rows returned by Qubole command. By default, only the values on first row are used for performing checks. This callable should return a list of records on which the checks have to be performed.

Note

All fields in common with template fields of QuboleOperator and CheckOperator are template-supported.

class airflow.contrib.operators.qubole_operator.QuboleOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Execute tasks (commands) on QDS (https://qubole.com).

Parameters:qubole_conn_id (str) – Connection id which consists of qds auth_token
kwargs:
command_type:type of command to be executed, e.g. hivecmd, shellcmd, hadoopcmd
tags:array of tags to be assigned with the command
cluster_label:cluster label on which the command will be executed
name:name to be given to command
notify:whether to send email on command completion or not (default is False)

Arguments specific to command types

hivecmd:
query:inline query statement
script_location:
 s3 location containing query statement
sample_size:size of sample in bytes on which to run query
macros:macro values which were used in query
sample_size:size of sample in bytes on which to run query
hive-version:Specifies the hive version to be used. eg: 0.13,1.2,etc.
prestocmd:
query:inline query statement
script_location:
 s3 location containing query statement
macros:macro values which were used in query
hadoopcmd:
sub_commnad:must be one these [“jar”, “s3distcp”, “streaming”] followed by 1 or more args
shellcmd:
script:inline command with args
script_location:
 s3 location containing query statement
files:list of files in s3 bucket as file1,file2 format. These files will be copied into the working directory where the qubole command is being executed.
archives:list of archives in s3 bucket as archive1,archive2 format. These will be unarchived into the working directory where the qubole command is being executed
parameters:any extra args which need to be passed to script (only when script_location is supplied)
pigcmd:
script:inline query statement (latin_statements)
script_location:
 s3 location containing pig query
parameters:any extra args which need to be passed to script (only when script_location is supplied
sparkcmd:
program:the complete Spark Program in Scala, SQL, Command, R, or Python
cmdline:spark-submit command line, all required information must be specify in cmdline itself.
sql:inline sql query
script_location:
 s3 location containing query statement
language:language of the program, Scala, SQL, Command, R, or Python
app_id:ID of an Spark job server app
arguments:spark-submit command line arguments
user_program_arguments:
 arguments that the user program takes in
macros:macro values which were used in query
note_id:Id of the Notebook to run
dbtapquerycmd:
db_tap_id:data store ID of the target database, in Qubole.
query:inline query statement
macros:macro values which were used in query
dbexportcmd:
mode:Can be 1 for Hive export or 2 for HDFS/S3 export
schema:Db schema name assumed accordingly by database if not specified
hive_table:Name of the hive table
partition_spec:partition specification for Hive table.
dbtap_id:data store ID of the target database, in Qubole.
db_table:name of the db table
db_update_mode:allowinsert or updateonly
db_update_keys:columns used to determine the uniqueness of rows
export_dir:HDFS/S3 location from which data will be exported.
fields_terminated_by:
 hex of the char used as column separator in the dataset
use_customer_cluster:
 To use cluster to run command
customer_cluster_label:
 the label of the cluster to run the command on
additional_options:
 Additional Sqoop options which are needed enclose options in double or single quotes e.g. ‘–map-column-hive id=int,data=string’
dbimportcmd:
mode:1 (simple), 2 (advance)
hive_table:Name of the hive table
schema:Db schema name assumed accordingly by database if not specified
hive_serde:Output format of the Hive Table
dbtap_id:data store ID of the target database, in Qubole.
db_table:name of the db table
where_clause:where clause, if any
parallelism:number of parallel db connections to use for extracting data
extract_query:SQL query to extract data from db. $CONDITIONS must be part of the where clause.
boundary_query:Query to be used get range of row IDs to be extracted
split_column:Column used as row ID to split data into ranges (mode 2)
use_customer_cluster:
 To use cluster to run command
customer_cluster_label:
 the label of the cluster to run the command on
additional_options:
 Additional Sqoop options which are needed enclose options in double or single quotes
class airflow.contrib.operators.qubole_check_operator.QuboleValueCheckOperator(**kwargs)[source]

Bases: airflow.operators.check_operator.ValueCheckOperator, airflow.contrib.operators.qubole_operator.QuboleOperator

Performs a simple value check using Qubole command. By default, each value on the first row of this Qubole command is compared with a pre-defined value. The check fails and errors out if the output of the command is not within the permissible limit of expected value.

Parameters:
  • qubole_conn_id (str) – Connection id which consists of qds auth_token
  • pass_value (str or int or float) – Expected value of the query results.
  • tolerance (int or float) – Defines the permissible pass_value range, for example if tolerance is 2, the Qubole command output can be anything between -2*pass_value and 2*pass_value, without the operator erring out.

kwargs:

Arguments specific to Qubole command can be referred from QuboleOperator docs.

results_parser_callable:
 This is an optional parameter to extend the flexibility of parsing the results of Qubole command to the users. This is a python callable which can hold the logic to parse list of rows returned by Qubole command. By default, only the values on first row are used for performing checks. This callable should return a list of records on which the checks have to be performed.

Note

All fields in common with template fields of QuboleOperator and ValueCheckOperator are template-supported.

class airflow.contrib.operators.s3_copy_object_operator.S3CopyObjectOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Creates a copy of an object that is already stored in S3.

Note: the S3 connection used here needs to have access to both source and destination bucket/key.

Parameters:
  • source_bucket_key (str) –

    The key of the source object.

    It can be either full s3:// style url or relative path from root level.

    When it’s specified as a full s3:// url, please omit source_bucket_name.

  • dest_bucket_key (str) –

    The key of the object to copy to.

    The convention to specify dest_bucket_key is the same as source_bucket_key.

  • source_bucket_name (str) –

    Name of the S3 bucket where the source object is in.

    It should be omitted when source_bucket_key is provided as a full s3:// url.

  • dest_bucket_name (str) –

    Name of the S3 bucket to where the object is copied.

    It should be omitted when dest_bucket_key is provided as a full s3:// url.

  • source_version_id (str) – Version ID of the source object (OPTIONAL)
  • aws_conn_id (str) – Connection id of the S3 connection to use
  • verify (bool or str) –

    Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified.

    You can provide the following values:

    • False: do not validate SSL certificates. SSL will still be used,
      but SSL certificates will not be verified.
    • path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
      You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.
class airflow.contrib.operators.s3_delete_objects_operator.S3DeleteObjectsOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

To enable users to delete single object or multiple objects from a bucket using a single HTTP request.

Users may specify up to 1000 keys to delete.

Parameters:
  • bucket (str) – Name of the bucket in which you are going to delete object(s)
  • keys (str or list) –

    The key(s) to delete from S3 bucket.

    When keys is a string, it’s supposed to be the key name of the single object to delete.

    When keys is a list, it’s supposed to be the list of the keys to delete.

    You may specify up to 1000 keys.

  • aws_conn_id (str) – Connection id of the S3 connection to use
  • verify (bool or str) –

    Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified.

    You can provide the following values:

    • False: do not validate SSL certificates. SSL will still be used,
      but SSL certificates will not be verified.
    • path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
      You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.
class airflow.contrib.operators.s3_list_operator.S3ListOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

List all objects from the bucket with the given string prefix in name.

This operator returns a python list with the name of objects which can be used by xcom in the downstream task.

Parameters:
  • bucket (str) – The S3 bucket where to find the objects. (templated)
  • prefix (str) – Prefix string to filters the objects whose name begin with such prefix. (templated)
  • delimiter (str) – the delimiter marks key hierarchy. (templated)
  • aws_conn_id (str) – The connection ID to use when connecting to S3 storage.
  • verify (bool or str) –

    Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values:

    • False: do not validate SSL certificates. SSL will still be used
      (unless use_ssl is False), but SSL certificates will not be verified.
    • path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
      You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.
Example:

The following operator would list all the files (excluding subfolders) from the S3 customers/2018/04/ key in the data bucket.

s3_file = S3ListOperator(
    task_id='list_3s_files',
    bucket='data',
    prefix='customers/2018/04/',
    delimiter='/',
    aws_conn_id='aws_customers_conn'
)
class airflow.contrib.operators.s3_to_gcs_operator.S3ToGoogleCloudStorageOperator(**kwargs)[source]

Bases: airflow.contrib.operators.s3_list_operator.S3ListOperator

Synchronizes an S3 key, possibly a prefix, with a Google Cloud Storage destination path.

Parameters:
  • bucket (str) – The S3 bucket where to find the objects. (templated)
  • prefix (str) – Prefix string which filters objects whose name begin with such prefix. (templated)
  • delimiter (str) – the delimiter marks key hierarchy. (templated)
  • aws_conn_id (str) – The source S3 connection
  • verify (bool or str) –

    Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values:

    • False: do not validate SSL certificates. SSL will still be used
      (unless use_ssl is False), but SSL certificates will not be verified.
    • path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
      You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.
  • dest_gcs_conn_id (str) – The destination connection ID to use when connecting to Google Cloud Storage.
  • dest_gcs (str) – The destination Google Cloud Storage bucket and prefix where you want to store the files. (templated)
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • replace (bool) – Whether you want to replace existing destination files or not.

Example:

s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
     task_id='s3_to_gcs_example',
     bucket='my-s3-bucket',
     prefix='data/customers-201804',
     dest_gcs_conn_id='google_cloud_default',
     dest_gcs='gs://my.gcs.bucket/some/customers/',
     replace=False,
     dag=my-dag)

Note that bucket, prefix, delimiter and dest_gcs are templated, so you can use variables in them if you wish.

class airflow.contrib.operators.s3_to_gcs_transfer_operator.S3ToGoogleCloudStorageTransferOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Synchronizes an S3 bucket with a Google Cloud Storage bucket using the GCP Storage Transfer Service.

Parameters:

Example:

s3_to_gcs_transfer_op = S3ToGoogleCloudStorageTransferOperator(
     task_id='s3_to_gcs_transfer_example',
     s3_bucket='my-s3-bucket',
     project_id='my-gcp-project',
     gcs_bucket='my-gcs-bucket',
     dag=my_dag)
class airflow.contrib.operators.s3_to_sftp_operator.S3ToSFTPOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

This operator enables the transferring of files from S3 to a SFTP server.

Parameters:
  • sftp_conn_id (string) – The sftp connection id. The name or identifier for establishing a connection to the SFTP server.
  • sftp_path (string) – The sftp remote path. This is the specified file path for uploading file to the SFTP server.
  • s3_conn_id (string) – The s3 connection id. The name or identifier for establishing a connection to S3
  • s3_bucket (string) – The targeted s3 bucket. This is the S3 bucket from where the file is downloaded.
  • s3_key (string) – The targeted s3 key. This is the specified file path for downloading the file from S3.
static get_s3_key(s3_key)[source]

This parses the correct format for S3 keys regardless of how the S3 url is passed.

class airflow.contrib.operators.sftp_operator.SFTPOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

SFTPOperator for transferring files from remote host to local or vice a versa. This operator uses ssh_hook to open sftp transport channel that serve as basis for file transfer.

Parameters:
  • ssh_hook (airflow.contrib.hooks.ssh_hook.SSHHook) – predefined ssh_hook to use for remote execution. Either ssh_hook or ssh_conn_id needs to be provided.
  • ssh_conn_id (str) – connection id from airflow Connections. ssh_conn_id will be ignored if ssh_hook is provided.
  • remote_host (str) – remote host to connect (templated) Nullable. If provided, it will replace the remote_host which was defined in ssh_hook or predefined in the connection of ssh_conn_id.
  • local_filepath (str) – local file path to get or put. (templated)
  • remote_filepath (str) – remote file path to get or put. (templated)
  • operation (str) – specify operation ‘get’ or ‘put’, defaults to put
  • confirm (bool) – specify if the SFTP operation should be confirmed, defaults to True
  • create_intermediate_dirs (bool) –

    create missing intermediate directories when copying from remote to local and vice-versa. Default is False.

    Example: The following task would copy file.txt to the remote host at /tmp/tmp1/tmp2/ while creating tmp,``tmp1`` and tmp2 if they don’t exist. If the parameter is not passed it would error as the directory does not exist.

    put_file = SFTPOperator(
        task_id="test_sftp",
        ssh_conn_id="ssh_default",
        local_filepath="/tmp/file.txt",
        remote_filepath="/tmp/tmp1/tmp2/file.txt",
        operation="put",
        create_intermediate_dirs=True,
        dag=dag
    )
    
class airflow.contrib.operators.sftp_to_s3_operator.SFTPToS3Operator(**kwargs)[source]

Bases: airflow.models.BaseOperator

This operator enables the transferring of files from a SFTP server to Amazon S3.

Parameters:
  • sftp_conn_id (string) – The sftp connection id. The name or identifier for establishing a connection to the SFTP server.
  • sftp_path (string) – The sftp remote path. This is the specified file path for downloading the file from the SFTP server.
  • s3_conn_id (string) – The s3 connection id. The name or identifier for establishing a connection to S3
  • s3_bucket (string) – The targeted s3 bucket. This is the S3 bucket to where the file is uploaded.
  • s3_key (string) – The targeted s3 key. This is the specified path for uploading the file to S3.
static get_s3_key(s3_key)[source]

This parses the correct format for S3 keys regardless of how the S3 url is passed.

class airflow.contrib.operators.ssh_operator.SSHOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

SSHOperator to execute commands on given remote host using the ssh_hook.

Parameters:
  • ssh_hook (airflow.contrib.hooks.ssh_hook.SSHHook) – predefined ssh_hook to use for remote execution. Either ssh_hook or ssh_conn_id needs to be provided.
  • ssh_conn_id (str) – connection id from airflow Connections. ssh_conn_id will be ignored if ssh_hook is provided.
  • remote_host (str) – remote host to connect (templated) Nullable. If provided, it will replace the remote_host which was defined in ssh_hook or predefined in the connection of ssh_conn_id.
  • command (str) – command to execute on remote host. (templated)
  • timeout (int) – timeout (in seconds) for executing the command.
class airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

This is the base operator for all SageMaker operators.

Parameters:
  • config (dict) – The configuration necessary to start a training job (templated)
  • aws_conn_id (str) – The AWS connection ID to use.
class airflow.contrib.operators.sagemaker_endpoint_config_operator.SageMakerEndpointConfigOperator(**kwargs)[source]

Bases: airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator

Create a SageMaker endpoint config.

This operator returns The ARN of the endpoint config created in Amazon SageMaker

Parameters:
class airflow.contrib.operators.sagemaker_endpoint_operator.SageMakerEndpointOperator(**kwargs)[source]

Bases: airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator

Create a SageMaker endpoint.

This operator returns The ARN of the endpoint created in Amazon SageMaker

Parameters:
  • config (dict) –

    The configuration necessary to create an endpoint.

    If you need to create a SageMaker endpoint based on an existed SageMaker model and an existed SageMaker endpoint config:

    config = endpoint_configuration;
    

    If you need to create all of SageMaker model, SageMaker endpoint-config and SageMaker endpoint:

    config = {
        'Model': model_configuration,
        'EndpointConfig': endpoint_config_configuration,
        'Endpoint': endpoint_configuration
    }
    

    For details of the configuration parameter of model_configuration see SageMaker.Client.create_model()

    For details of the configuration parameter of endpoint_config_configuration see SageMaker.Client.create_endpoint_config()

    For details of the configuration parameter of endpoint_configuration see SageMaker.Client.create_endpoint()

  • aws_conn_id (str) – The AWS connection ID to use.
  • wait_for_completion (bool) – Whether the operator should wait until the endpoint creation finishes.
  • check_interval (int) – If wait is set to True, this is the time interval, in seconds, that this operation waits before polling the status of the endpoint creation.
  • max_ingestion_time (int) – If wait is set to True, this operation fails if the endpoint creation doesn’t finish within max_ingestion_time seconds. If you set this parameter to None it never times out.
  • operation (str) – Whether to create an endpoint or update an endpoint. Must be either ‘create or ‘update’.
class airflow.contrib.operators.sagemaker_model_operator.SageMakerModelOperator(**kwargs)[source]

Bases: airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator

Create a SageMaker model.

This operator returns The ARN of the model created in Amazon SageMaker

Parameters:
  • config (dict) –

    The configuration necessary to create a model.

    For details of the configuration parameter see SageMaker.Client.create_model()

  • aws_conn_id (str) – The AWS connection ID to use.
class airflow.contrib.operators.sagemaker_training_operator.SageMakerTrainingOperator(**kwargs)[source]

Bases: airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator

Initiate a SageMaker training job.

This operator returns The ARN of the training job created in Amazon SageMaker.

Parameters:
  • config (dict) –

    The configuration necessary to start a training job (templated).

    For details of the configuration parameter see SageMaker.Client.create_training_job()

  • aws_conn_id (str) – The AWS connection ID to use.
  • wait_for_completion (bool) – If wait is set to True, the time interval, in seconds, that the operation waits to check the status of the training job.
  • print_log (bool) – if the operator should print the cloudwatch log during training
  • check_interval (int) – if wait is set to be true, this is the time interval in seconds which the operator will check the status of the training job
  • max_ingestion_time (int) – If wait is set to True, the operation fails if the training job doesn’t finish within max_ingestion_time seconds. If you set this parameter to None, the operation does not timeout.
class airflow.contrib.operators.sagemaker_transform_operator.SageMakerTransformOperator(**kwargs)[source]

Bases: airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator

Initiate a SageMaker transform job.

This operator returns The ARN of the model created in Amazon SageMaker.

Parameters:
  • config (dict) –

    The configuration necessary to start a transform job (templated).

    If you need to create a SageMaker transform job based on an existed SageMaker model:

    config = transform_config
    

    If you need to create both SageMaker model and SageMaker Transform job:

    config = {
        'Model': model_config,
        'Transform': transform_config
    }
    

    For details of the configuration parameter of transform_config see SageMaker.Client.create_transform_job()

    For details of the configuration parameter of model_config, See: SageMaker.Client.create_model()

  • aws_conn_id (string) – The AWS connection ID to use.
  • wait_for_completion (bool) – Set to True to wait until the transform job finishes.
  • check_interval (int) – If wait is set to True, the time interval, in seconds, that this operation waits to check the status of the transform job.
  • max_ingestion_time (int) – If wait is set to True, the operation fails if the transform job doesn’t finish within max_ingestion_time seconds. If you set this parameter to None, the operation does not timeout.
class airflow.contrib.operators.sagemaker_tuning_operator.SageMakerTuningOperator(**kwargs)[source]

Bases: airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator

Initiate a SageMaker hyperparameter tuning job.

This operator returns The ARN of the tuning job created in Amazon SageMaker.

Parameters:
  • config (dict) –

    The configuration necessary to start a tuning job (templated).

    For details of the configuration parameter see SageMaker.Client.create_hyper_parameter_tuning_job()

  • aws_conn_id (str) – The AWS connection ID to use.
  • wait_for_completion (bool) – Set to True to wait until the tuning job finishes.
  • check_interval (int) – If wait is set to True, the time interval, in seconds, that this operation waits to check the status of the tuning job.
  • max_ingestion_time (int) – If wait is set to True, the operation fails if the tuning job doesn’t finish within max_ingestion_time seconds. If you set this parameter to None, the operation does not timeout.
class airflow.contrib.operators.segment_track_event_operator.SegmentTrackEventOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Send Track Event to Segment for a specified user_id and event

Parameters:
  • user_id (str) – The ID for this user in your database. (templated)
  • event (str) – The name of the event you’re tracking. (templated)
  • properties (dict) – A dictionary of properties for the event. (templated)
  • segment_conn_id (str) – The connection ID to use when connecting to Segment.
  • segment_debug_mode (bool) – Determines whether Segment should run in debug mode. Defaults to False
class airflow.contrib.operators.slack_webhook_operator.SlackWebhookOperator(**kwargs)[source]

Bases: airflow.operators.http_operator.SimpleHttpOperator

This operator allows you to post messages to Slack using incoming webhooks. Takes both Slack webhook token directly and connection that has Slack webhook token. If both supplied, Slack webhook token will be used.

Each Slack webhook token can be pre-configured to use a specific channel, username and icon. You can override these defaults in this hook.

Parameters:
  • http_conn_id (str) – connection that has Slack webhook token in the extra field
  • webhook_token (str) – Slack webhook token
  • message (str) – The message you want to send on Slack
  • attachments (list) – The attachments to send on Slack. Should be a list of dictionaries representing Slack attachments.
  • channel (str) – The channel the message should be posted to
  • username (str) – The username to post to slack with
  • icon_emoji (str) – The emoji to use as icon for the user posting to Slack
  • link_names (bool) – Whether or not to find and link channel and usernames in your message
  • proxy (str) – Proxy to use to make the Slack webhook call
execute(context)[source]

Call the SlackWebhookHook to post the provided Slack message

class airflow.contrib.operators.snowflake_operator.SnowflakeOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Executes sql code in a Snowflake database

Parameters:
  • snowflake_conn_id (str) – reference to specific snowflake connection id
  • sql (Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. Template reference are recognized by str ending in '.sql') – the sql code to be executed. (templated)
  • warehouse (str) – name of warehouse which overwrite defined one in connection
  • database (str) – name of database which overwrite defined one in connection
class airflow.contrib.operators.sns_publish_operator.SnsPublishOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Publish a message to Amazon SNS.

Parameters:
  • aws_conn_id (str) – aws connection to use
  • target_arn (str) – either a TopicArn or an EndpointArn
  • message (str) – the default message you want to send (templated)
class airflow.contrib.operators.spark_jdbc_operator.SparkJDBCOperator(**kwargs)[source]

Bases: airflow.contrib.operators.spark_submit_operator.SparkSubmitOperator

This operator extends the SparkSubmitOperator specifically for performing data transfers to/from JDBC-based databases with Apache Spark. As with the SparkSubmitOperator, it assumes that the “spark-submit” binary is available on the PATH.

Parameters:
  • spark_app_name (str) – Name of the job (default airflow-spark-jdbc)
  • spark_conn_id (str) – Connection id as configured in Airflow administration
  • spark_conf (dict) – Any additional Spark configuration properties
  • spark_py_files (str) – Additional python files used (.zip, .egg, or .py)
  • spark_files (str) – Additional files to upload to the container running the job
  • spark_jars (str) – Additional jars to upload and add to the driver and executor classpath
  • num_executors (int) – number of executor to run. This should be set so as to manage the number of connections made with the JDBC database
  • executor_cores (int) – Number of cores per executor
  • executor_memory (str) – Memory per executor (e.g. 1000M, 2G)
  • driver_memory (str) – Memory allocated to the driver (e.g. 1000M, 2G)
  • verbose (bool) – Whether to pass the verbose flag to spark-submit for debugging
  • keytab (str) – Full path to the file that contains the keytab
  • principal (str) – The name of the kerberos principal used for keytab
  • cmd_type (str) – Which way the data should flow. 2 possible values: spark_to_jdbc: data written by spark from metastore to jdbc jdbc_to_spark: data written by spark from jdbc to metastore
  • jdbc_table (str) – The name of the JDBC table
  • jdbc_conn_id (str) – Connection id used for connection to JDBC database
  • jdbc_driver (str) – Name of the JDBC driver to use for the JDBC connection. This driver (usually a jar) should be passed in the ‘jars’ parameter
  • metastore_table (str) – The name of the metastore table,
  • jdbc_truncate (bool) – (spark_to_jdbc only) Whether or not Spark should truncate or drop and recreate the JDBC table. This only takes effect if ‘save_mode’ is set to Overwrite. Also, if the schema is different, Spark cannot truncate, and will drop and recreate
  • save_mode (str) – The Spark save-mode to use (e.g. overwrite, append, etc.)
  • save_format (str) – (jdbc_to_spark-only) The Spark save-format to use (e.g. parquet)
  • batch_size (int) – (spark_to_jdbc only) The size of the batch to insert per round trip to the JDBC database. Defaults to 1000
  • fetch_size (int) – (jdbc_to_spark only) The size of the batch to fetch per round trip from the JDBC database. Default depends on the JDBC driver
  • num_partitions (int) – The maximum number of partitions that can be used by Spark simultaneously, both for spark_to_jdbc and jdbc_to_spark operations. This will also cap the number of JDBC connections that can be opened
  • partition_column (str) – (jdbc_to_spark-only) A numeric column to be used to partition the metastore table by. If specified, you must also specify: num_partitions, lower_bound, upper_bound
  • lower_bound (int) – (jdbc_to_spark-only) Lower bound of the range of the numeric partition column to fetch. If specified, you must also specify: num_partitions, partition_column, upper_bound
  • upper_bound (int) – (jdbc_to_spark-only) Upper bound of the range of the numeric partition column to fetch. If specified, you must also specify: num_partitions, partition_column, lower_bound
  • create_table_column_types – (spark_to_jdbc-only) The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: “name CHAR(64), comments VARCHAR(1024)”). The specified types should be valid spark sql data types.
execute(context)[source]

Call the SparkSubmitHook to run the provided spark job

class airflow.contrib.operators.spark_sql_operator.SparkSqlOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Execute Spark SQL query

Parameters:
  • sql (str) – The SQL query to execute. (templated)
  • conf (str (format: PROP=VALUE)) – arbitrary Spark configuration property
  • conn_id (str) – connection_id string
  • total_executor_cores (int) – (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker)
  • executor_cores (int) – (Standalone & YARN only) Number of cores per executor (Default: 2)
  • executor_memory (str) – Memory per executor (e.g. 1000M, 2G) (Default: 1G)
  • keytab (str) – Full path to the file that contains the keytab
  • master (str) – spark://host:port, mesos://host:port, yarn, or local
  • name (str) – Name of the job
  • num_executors (int) – Number of executors to launch
  • verbose (bool) – Whether to pass the verbose flag to spark-sql
  • yarn_queue (str) – The YARN queue to submit to (Default: “default”)
execute(context)[source]

Call the SparkSqlHook to run the provided sql query

class airflow.contrib.operators.spark_submit_operator.SparkSubmitOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

This hook is a wrapper around the spark-submit binary to kick off a spark-submit job. It requires that the “spark-submit” binary is in the PATH or the spark-home is set in the extra on the connection.

Parameters:
  • application (str) – The application that submitted as a job, either jar or py file. (templated)
  • conf (dict) – Arbitrary Spark configuration properties (templated)
  • conn_id (str) – The connection id as configured in Airflow administration. When an invalid connection_id is supplied, it will default to yarn.
  • files (str) – Upload additional files to the executor running the job, separated by a comma. Files will be placed in the working directory of each executor. For example, serialized objects. (templated)
  • py_files (str) – Additional python files used by the job, can be .zip, .egg or .py. (templated)
  • jars (str) – Submit additional jars to upload and place them in executor classpath. (templated)
  • driver_classpath (str) – Additional, driver-specific, classpath settings. (templated)
  • java_class (str) – the main class of the Java application
  • packages (str) – Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths. (templated)
  • exclude_packages (str) – Comma-separated list of maven coordinates of jars to exclude while resolving the dependencies provided in ‘packages’ (templated)
  • repositories (str) – Comma-separated list of additional remote repositories to search for the maven coordinates given with ‘packages’
  • total_executor_cores (int) – (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker)
  • executor_cores (int) – (Standalone & YARN only) Number of cores per executor (Default: 2)
  • executor_memory (str) – Memory per executor (e.g. 1000M, 2G) (Default: 1G)
  • driver_memory (str) – Memory allocated to the driver (e.g. 1000M, 2G) (Default: 1G)
  • keytab (str) – Full path to the file that contains the keytab (templated)
  • principal (str) – The name of the kerberos principal used for keytab (templated)
  • name (str) – Name of the job (default airflow-spark). (templated)
  • num_executors (int) – Number of executors to launch
  • application_args (list) – Arguments for the application being submitted (templated)
  • env_vars (dict) – Environment variables for spark-submit. It supports yarn and k8s mode too. (templated)
  • verbose (bool) – Whether to pass the verbose flag to spark-submit process for debugging
  • spark_binary (string) – The command to use for spark submit. Some distros may use spark2-submit.
execute(context)[source]

Call the SparkSubmitHook to run the provided spark job

class airflow.contrib.operators.sqoop_operator.SqoopOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Execute a Sqoop job. Documentation for Apache Sqoop can be found here: https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html

execute(context)[source]

Execute sqoop job

class airflow.contrib.operators.vertica_operator.VerticaOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Executes sql code in a specific Vertica database

Parameters:
  • vertica_conn_id (str) – reference to a specific Vertica database
  • sql (Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. Template reference are recognized by str ending in '.sql') – the sql code to be executed. (templated)
class airflow.contrib.operators.vertica_to_hive.VerticaToHiveTransfer(**kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from Vertica to Hive. The operator runs your query against Vertica, stores the file locally before loading it into a Hive table. If the create or recreate arguments are set to True, a CREATE TABLE and DROP TABLE statements are generated. Hive data types are inferred from the cursor’s metadata. Note that the table generated in Hive uses STORED AS textfile which isn’t the most efficient serialization format. If a large amount of data is loaded and/or if the table gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using a HiveOperator.

Parameters:
  • sql (str) – SQL query to execute against the Vertica database. (templated)
  • hive_table (str) – target Hive table, use dot notation to target a specific database. (templated)
  • create (bool) – whether to create the table if it doesn’t exist
  • recreate (bool) – whether to drop and recreate the table at every execution
  • partition (dict) – target partition as a dict of partition columns and values. (templated)
  • delimiter (str) – field delimiter in the file
  • vertica_conn_id (str) – source Vertica connection
  • hive_conn_id (str) – destination hive connection
class airflow.contrib.operators.vertica_to_mysql.VerticaToMySqlTransfer(**kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from Vertica to MySQL.

Parameters:
  • sql (str) – SQL query to execute against the Vertica database. (templated)
  • vertica_conn_id (str) – source Vertica connection
  • mysql_table (str) – target MySQL table, use dot notation to target a specific database. (templated)
  • mysql_conn_id (str) – source mysql connection
  • mysql_preoperator (str) – sql statement to run against MySQL prior to import, typically use to truncate of delete in place of the data coming in, allowing the task to be idempotent (running the task twice won’t double load data). (templated)
  • mysql_postoperator (str) – sql statement to run against MySQL after the import, typically used to move data from staging to production and issue cleanup commands. (templated)
  • bulk_load (bool) – flag to use bulk_load option. This loads MySQL directly from a tab-delimited text file using the LOAD DATA LOCAL INFILE command. This option requires an extra connection parameter for the destination MySQL connection: {‘local_infile’: true}.
class airflow.contrib.operators.wasb_delete_blob_operator.WasbDeleteBlobOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

Deletes blob(s) on Azure Blob Storage.

Parameters:
  • container_name (str) – Name of the container. (templated)
  • blob_name (str) – Name of the blob. (templated)
  • wasb_conn_id (str) – Reference to the wasb connection.
  • check_options – Optional keyword arguments that WasbHook.check_for_blob() takes.
  • is_prefix (bool) – If blob_name is a prefix, delete all files matching prefix.
  • ignore_if_missing (bool) – if True, then return success even if the blob does not exist.
class airflow.contrib.operators.winrm_operator.WinRMOperator(**kwargs)[source]

Bases: airflow.models.BaseOperator

WinRMOperator to execute commands on given remote host using the winrm_hook.

Parameters:
  • winrm_hook (airflow.contrib.hooks.winrm_hook.WinRMHook) – predefined ssh_hook to use for remote execution
  • ssh_conn_id (str) – connection id from airflow Connections
  • remote_host (str) – remote host to connect
  • command (str) – command to execute on remote host. (templated)
  • timeout (int) – timeout for executing the command.

Sensors

class airflow.contrib.sensors.aws_athena_sensor.AthenaSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Asks for the state of the Query until it reaches a failure state or success state. If it fails, failing the task.

Parameters:
  • query_execution_id (str) – query_execution_id to check the state of
  • max_retires (int) – Number of times to poll for query state before returning the current state, defaults to None
  • aws_conn_id (str) – aws connection to use, defaults to ‘aws_default’
  • sleep_time (int) – Time to wait between two consecutive call to check query status on athena, defaults to 10
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.aws_glue_catalog_partition_sensor.AwsGlueCatalogPartitionSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for a partition to show up in AWS Glue Catalog.

Parameters:
  • table_name (str) – The name of the table to wait for, supports the dot notation (my_database.my_table)
  • expression (str) – The partition clause to wait for. This is passed as is to the AWS Glue Catalog API’s get_partitions function, and supports SQL like notation as in ds='2015-01-01' AND type='value' and comparison operators as in "ds>=2015-01-01". See https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-partitions.html #aws-glue-api-catalog-partitions-GetPartitions
  • aws_conn_id (str) – ID of the Airflow connection where credentials and extra configuration are stored
  • region_name (str) – Optional aws region name (example: us-east-1). Uses region from connection if not specified.
  • database_name (str) – The name of the catalog database where the partitions reside.
  • poke_interval (int) – Time in seconds that the job should wait in between each tries
get_hook()[source]

Gets the AwsGlueCatalogHook

poke(context)[source]

Checks for existence of the partition in the AWS Glue Catalog table

class airflow.contrib.sensors.aws_redshift_cluster_sensor.AwsRedshiftClusterSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for a Redshift cluster to reach a specific status.

Parameters:
  • cluster_identifier (str) – The identifier for the cluster being pinged.
  • target_status (str) – The cluster status desired.
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.azure_cosmos_sensor.AzureCosmosDocumentSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Checks for the existence of a document which matches the given query in CosmosDB. Example:

>>> azure_cosmos_sensor = AzureCosmosDocumentSensor(database_name="somedatabase_name",
...                            collection_name="somecollection_name",
...                            document_id="unique-doc-id",
...                            azure_cosmos_conn_id="azure_cosmos_default",
...                            task_id="azure_cosmos_sensor")
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.bash_sensor.BashSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Executes a bash command/script and returns True if and only if the return code is 0.

Parameters:
  • bash_command (str) – The command, set of commands or reference to a bash script (must be ‘.sh’) to be executed.
  • env (dict) – If env is not None, it must be a mapping that defines the environment variables for the new process; these are used instead of inheriting the current process environment, which is the default behavior. (templated)
  • output_encoding (str) – output encoding of bash command.
poke(context)[source]

Execute the bash command in a temporary directory which will be cleaned afterwards

class airflow.contrib.sensors.bigquery_sensor.BigQueryTableSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Checks for the existence of a table in Google Bigquery.

Parameters:
  • project_id (str) – The Google cloud project in which to look for the table. The connection supplied to the hook must provide access to the specified project.
  • dataset_id (str) – The name of the dataset in which to look for the table. storage bucket.
  • table_id (str) – The name of the table to check the existence of.
  • bigquery_conn_id (str) – The connection ID to use when connecting to Google BigQuery.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.cassandra_record_sensor.CassandraRecordSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Checks for the existence of a record in a Cassandra cluster.

For example, if you want to wait for a record that has values ‘v1’ and ‘v2’ for each primary keys ‘p1’ and ‘p2’ to be populated in keyspace ‘k’ and table ‘t’, instantiate it as follows:

>>> cassandra_sensor = CassandraRecordSensor(table="k.t",
...                                          keys={"p1": "v1", "p2": "v2"},
...                                          cassandra_conn_id="cassandra_default",
...                                          task_id="cassandra_sensor")
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.cassandra_table_sensor.CassandraTableSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Checks for the existence of a table in a Cassandra cluster.

For example, if you want to wait for a table called ‘t’ to be created in a keyspace ‘k’, instantiate it as follows:

>>> cassandra_sensor = CassandraTableSensor(table="k.t",
...                                         cassandra_conn_id="cassandra_default",
...                                         task_id="cassandra_sensor")
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.celery_queue_sensor.CeleryQueueSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for a Celery queue to be empty. By default, in order to be considered empty, the queue must not have any tasks in the reserved, scheduled or active states.

Parameters:
  • celery_queue (str) – The name of the Celery queue to wait for.
  • target_task_id (str) – Task id for checking
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.datadog_sensor.DatadogSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

A sensor to listen, with a filter, to datadog event streams and determine if some event was emitted.

Depends on the datadog API, which has to be deployed on the same server where Airflow runs.

Parameters:
  • datadog_conn_id – The connection to datadog, containing metadata for api keys.
  • datadog_conn_id – str
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.weekday_sensor.DayOfWeekSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits until the first specified day of the week. For example, if the execution day of the task is ‘2018-12-22’ (Saturday) and you pass ‘FRIDAY’, the task will wait until next Friday.

Example (with single day):

weekend_check = DayOfWeekSensor(
    task_id='weekend_check',
    week_day='Saturday',
    use_task_execution_day=True,
    dag=dag)

Example (with multiple day using set):

weekend_check = DayOfWeekSensor(
    task_id='weekend_check',
    week_day={'Saturday', 'Sunday'},
    use_task_execution_day=True,
    dag=dag)

Example (with WeekDay enum):

# import WeekDay Enum
from airflow.contrib.utils.weekday import WeekDay

weekend_check = DayOfWeekSensor(
    task_id='weekend_check',
    week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
    use_task_execution_day=True,
    dag=dag)
Parameters:
  • week_day (set or str or airflow.contrib.utils.weekday.WeekDay) –

    Day of the week to check (full name). Optionally, a set of days can also be provided using a set. Example values:

    • "MONDAY",
    • {"Saturday", "Sunday"}
    • {WeekDay.TUESDAY}
    • {WeekDay.SATURDAY, WeekDay.SUNDAY}
  • use_task_execution_day (bool) – If True, uses task’s execution day to compare with week_day. Execution Date is Useful for backfilling. If False, uses system’s day of the week. Useful when you don’t want to run anything on weekdays on the system.
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.emr_base_sensor.EmrBaseSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Contains general sensor behavior for EMR. Subclasses should implement get_emr_response() and state_from_response() methods. Subclasses should also implement NON_TERMINAL_STATES and FAILED_STATE constants.

poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.emr_job_flow_sensor.EmrJobFlowSensor(**kwargs)[source]

Bases: airflow.contrib.sensors.emr_base_sensor.EmrBaseSensor

Asks for the state of the JobFlow until it reaches a terminal state. If it fails the sensor errors, failing the task.

Parameters:job_flow_id (str) – job_flow_id to check the state of
class airflow.contrib.sensors.emr_step_sensor.EmrStepSensor(**kwargs)[source]

Bases: airflow.contrib.sensors.emr_base_sensor.EmrBaseSensor

Asks for the state of the step until it reaches a terminal state. If it fails the sensor errors, failing the task.

Parameters:
  • job_flow_id (str) – job_flow_id which contains the step check the state of
  • step_id (str) – step to check the state of
class airflow.contrib.sensors.ftp_sensor.FTPSSensor(**kwargs)[source]

Bases: airflow.contrib.sensors.ftp_sensor.FTPSensor

Waits for a file or directory to be present on FTP over SSL.

class airflow.contrib.sensors.ftp_sensor.FTPSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for a file or directory to be present on FTP.

poke(context)[source]

Function that the sensors defined while deriving this class should override.

template_fields = ('path',)

Errors that are transient in nature, and where action can be retried

class airflow.contrib.sensors.file_sensor.FileSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for a file or folder to land in a filesystem.

If the path given is a directory then this sensor will only return true if any files exist inside it (either directly, or within a subdirectory)

Parameters:
  • fs_conn_id (str) – reference to the File (path) connection id
  • filepath – File or folder name (relative to the base path set within the connection)
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.gcs_sensor.GoogleCloudStorageObjectSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Checks for the existence of a file in Google Cloud Storage.

Parameters:
  • bucket (str) – The Google cloud storage bucket where the object is.
  • object (str) – The name of the object to check in the Google cloud storage bucket.
  • google_cloud_conn_id (str) – The connection ID to use when connecting to Google cloud storage.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.gcs_sensor.GoogleCloudStorageObjectUpdatedSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Checks if an object is updated in Google Cloud Storage.

Parameters:
  • bucket (str) – The Google cloud storage bucket where the object is.
  • object (str) – The name of the object to download in the Google cloud storage bucket.
  • ts_func (function) – Callback for defining the update condition. The default callback returns execution_date + schedule_interval. The callback takes the context as parameter.
  • google_cloud_conn_id (str) – The connection ID to use when connecting to Google cloud storage.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.gcs_sensor.GoogleCloudStoragePrefixSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Checks for the existence of a files at prefix in Google Cloud Storage bucket.

Parameters:
  • bucket (str) – The Google cloud storage bucket where the object is.
  • prefix (str) – The name of the prefix to check in the Google cloud storage bucket.
  • google_cloud_conn_id (str) – The connection ID to use when connecting to Google cloud storage.
  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.imap_attachment_sensor.ImapAttachmentSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for a specific attachment on a mail server.

Parameters:
  • attachment_name (str) – The name of the attachment that will be checked.
  • check_regex (bool) – If set to True the attachment’s name will be parsed as regular expression. Through this you can get a broader set of attachments that it will look for than just only the equality of the attachment name. The default value is False.
  • mail_folder (str) – The mail folder in where to search for the attachment. The default value is ‘INBOX’.
  • conn_id (str) – The connection to run the sensor against. The default value is ‘imap_default’.
poke(context)[source]

Pokes for a mail attachment on the mail server.

Parameters:context (dict) – The context that is being provided when poking.
Returns:True if attachment with the given name is present and False if not.
Return type:bool
class airflow.contrib.sensors.jira_sensor.JiraSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Monitors a jira ticket for any change.

Parameters:
  • jira_conn_id (str) – reference to a pre-defined Jira Connection
  • method_name (str) – method name from jira-python-sdk to be execute
  • method_params (dict) – parameters for the method method_name
  • result_processor (function) – function that return boolean and act as a sensor response
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.jira_sensor.JiraTicketSensor(**kwargs)[source]

Bases: airflow.contrib.sensors.jira_sensor.JiraSensor

Monitors a jira ticket for given change in terms of function.

Parameters:
  • jira_conn_id (str) – reference to a pre-defined Jira Connection
  • ticket_id (str) – id of the ticket to be monitored
  • field (str) – field of the ticket to be monitored
  • expected_value (str) – expected value of the field
  • result_processor (function) – function that return boolean and act as a sensor response
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.mongo_sensor.MongoSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Checks for the existence of a document which matches the given query in MongoDB. Example:

>>> mongo_sensor = MongoSensor(collection="coll",
...                            query={"key": "value"},
...                            mongo_conn_id="mongo_default",
...                            task_id="mongo_sensor")
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.pubsub_sensor.PubSubPullSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Pulls messages from a PubSub subscription and passes them through XCom.

This sensor operator will pull up to max_messages messages from the specified PubSub subscription. When the subscription returns messages, the poke method’s criteria will be fulfilled and the messages will be returned from the operator and passed through XCom for downstream tasks.

If ack_messages is set to True, messages will be immediately acknowledged before being returned, otherwise, downstream tasks will be responsible for acknowledging them.

project and subscription are templated so you can use variables in them.

execute(context)[source]

Overridden to allow messages to be passed

poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.python_sensor.PythonSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for a Python callable to return True.

User could put input argument in templates_dict e.g templates_dict = {'start_ds': 1970} and access the argument by calling kwargs['templates_dict']['start_ds'] in the the callable

Parameters:
  • python_callable (python callable) – A reference to an object that is callable
  • op_kwargs (dict) – a dictionary of keyword arguments that will get unpacked in your function
  • op_args (list) – a list of positional arguments that will get unpacked when calling your callable
  • provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates. For this to work, you need to define **kwargs in your function header.
  • templates_dict (dict of str) – a dictionary where the values are templates that will get templated by the Airflow engine sometime between __init__ and execute takes place and are made available in your callable’s context after the template has been applied.
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.qubole_sensor.QuboleFileSensor(**kwargs)[source]

Bases: airflow.contrib.sensors.qubole_sensor.QuboleSensor

Wait for a file or folder to be present in cloud storage and check for its presence via QDS APIs

Parameters:
  • qubole_conn_id (str) – Connection id which consists of qds auth_token
  • data (a JSON object) – a JSON object containing payload, whose presence needs to be checked Check this example for sample payload structure.

Note

Both data and qubole_conn_id fields support templating. You can also use .txt files for template-driven use cases.

class airflow.contrib.sensors.qubole_sensor.QubolePartitionSensor(**kwargs)[source]

Bases: airflow.contrib.sensors.qubole_sensor.QuboleSensor

Wait for a Hive partition to show up in QHS (Qubole Hive Service) and check for its presence via QDS APIs

Parameters:
  • qubole_conn_id (str) – Connection id which consists of qds auth_token
  • data (a JSON object) –

    a JSON object containing payload, whose presence needs to be checked. Check this example for sample payload structure.

Note

Both data and qubole_conn_id fields support templating. You can also use .txt files for template-driven use cases.

class airflow.contrib.sensors.qubole_sensor.QuboleSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Base class for all Qubole Sensors

poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.redis_key_sensor.RedisKeySensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Checks for the existence of a key in a Redis

poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.sftp_sensor.SFTPSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for a file or directory to be present on SFTP.

Parameters:
  • path (str) – Remote file or directory path
  • sftp_conn_id (str) – The connection to run the sensor against
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.sagemaker_base_sensor.SageMakerBaseSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Contains general sensor behavior for SageMaker. Subclasses should implement get_sagemaker_response() and state_from_response() methods. Subclasses should also implement NON_TERMINAL_STATES and FAILED_STATE methods.

poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.sagemaker_endpoint_sensor.SageMakerEndpointSensor(**kwargs)[source]

Bases: airflow.contrib.sensors.sagemaker_base_sensor.SageMakerBaseSensor

Asks for the state of the endpoint state until it reaches a terminal state. If it fails the sensor errors, the task fails.

Parameters:job_name (str) – job_name of the endpoint instance to check the state of
class airflow.contrib.sensors.sagemaker_training_sensor.SageMakerTrainingSensor(**kwargs)[source]

Bases: airflow.contrib.sensors.sagemaker_base_sensor.SageMakerBaseSensor

Asks for the state of the training state until it reaches a terminal state. If it fails the sensor errors, failing the task.

Parameters:
  • job_name (str) – name of the SageMaker training job to check the state of
  • print_log (bool) – if the operator should print the cloudwatch log
class airflow.contrib.sensors.sagemaker_transform_sensor.SageMakerTransformSensor(**kwargs)[source]

Bases: airflow.contrib.sensors.sagemaker_base_sensor.SageMakerBaseSensor

Asks for the state of the transform state until it reaches a terminal state. The sensor will error if the job errors, throwing a AirflowException containing the failure reason.

Parameters:job_name (string) – job_name of the transform job instance to check the state of
class airflow.contrib.sensors.sagemaker_tuning_sensor.SageMakerTuningSensor(**kwargs)[source]

Bases: airflow.contrib.sensors.sagemaker_base_sensor.SageMakerBaseSensor

Asks for the state of the tuning state until it reaches a terminal state. The sensor will error if the job errors, throwing a AirflowException containing the failure reason.

Parameters:job_name (str) – job_name of the tuning instance to check the state of
class airflow.contrib.sensors.wasb_sensor.WasbBlobSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for a blob to arrive on Azure Blob Storage.

Parameters:
  • container_name (str) – Name of the container.
  • blob_name (str) – Name of the blob.
  • wasb_conn_id (str) – Reference to the wasb connection.
  • check_options (dict) – Optional keyword arguments that WasbHook.check_for_blob() takes.
poke(context)[source]

Function that the sensors defined while deriving this class should override.

class airflow.contrib.sensors.wasb_sensor.WasbPrefixSensor(**kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for blobs matching a prefix to arrive on Azure Blob Storage.

Parameters:
  • container_name (str) – Name of the container.
  • prefix (str) – Prefix of the blob.
  • wasb_conn_id (str) – Reference to the wasb connection.
  • check_options (dict) – Optional keyword arguments that WasbHook.check_for_prefix() takes.
poke(context)[source]

Function that the sensors defined while deriving this class should override.

Macros

Here’s a list of variables and macros that can be used in templates

Default Variables

The Airflow engine passes a few variables by default that are accessible in all templates

Variable Description
{{ ds }} the execution date as YYYY-MM-DD
{{ ds_nodash }} the execution date as YYYYMMDD
{{ prev_ds }} the previous execution date as YYYY-MM-DD if {{ ds }} is 2018-01-08 and schedule_interval is @weekly, {{ prev_ds }} will be 2018-01-01
{{ prev_ds_nodash }} the previous execution date as YYYYMMDD if exists, else None
{{ next_ds }} the next execution date as YYYY-MM-DD if {{ ds }} is 2018-01-01 and schedule_interval is @weekly, {{ next_ds }} will be 2018-01-08
{{ next_ds_nodash }} the next execution date as YYYYMMDD if exists, else None
{{ yesterday_ds }} the day before the execution date as YYYY-MM-DD
{{ yesterday_ds_nodash }} the day before the execution date as YYYYMMDD
{{ tomorrow_ds }} the day after the execution date as YYYY-MM-DD
{{ tomorrow_ds_nodash }} the day after the execution date as YYYYMMDD
{{ ts }} same as execution_date.isoformat(). Example: 2018-01-01T00:00:00+00:00
{{ ts_nodash }} same as ts without -, : and TimeZone info. Example: 20180101T000000
{{ ts_nodash_with_tz }} same as ts without - and :. Example: 20180101T000000+0000
{{ execution_date }} the execution_date (pendulum.Pendulum)
{{ prev_execution_date }} the previous execution date (if available) (pendulum.Pendulum)
{{ next_execution_date }} the next execution date (pendulum.Pendulum)
{{ dag }} the DAG object
{{ task }} the Task object
{{ macros }} a reference to the macros package, described below
{{ task_instance }} the task_instance object
{{ end_date }} same as {{ ds }}
{{ latest_date }} same as {{ ds }}
{{ ti }} same as {{ task_instance }}
{{ params }} a reference to the user-defined params dictionary which can be overridden by the dictionary passed through trigger_dag -c if you enabled dag_run_conf_overrides_params` in ``airflow.cfg
{{ var.value.my_var }} global defined variables represented as a dictionary
{{ var.json.my_var.path }} global defined variables represented as a dictionary with deserialized JSON object, append the path to the key within the JSON object
{{ task_instance_key_str }} a unique, human-readable key to the task instance formatted {dag_id}_{task_id}_{ds}
{{ conf }} the full configuration object located at airflow.configuration.conf which represents the content of your airflow.cfg
{{ run_id }} the run_id of the current DAG run
{{ dag_run }} a reference to the DagRun object
{{ test_mode }} whether the task instance was called using the CLI’s test subcommand

Note that you can access the object’s attributes and methods with simple dot notation. Here are some examples of what is possible: {{ task.owner }}, {{ task.task_id }}, {{ ti.hostname }}, … Refer to the models documentation for more information on the objects’ attributes and methods.

The var template variable allows you to access variables defined in Airflow’s UI. You can access them as either plain-text or JSON. If you use JSON, you are also able to walk nested structures, such as dictionaries like: {{ var.json.my_dict_var.key1 }}

Macros

Macros are a way to expose objects to your templates and live under the macros namespace in your templates.

A few commonly used libraries and methods are made available.

Variable Description
macros.datetime The standard lib’s datetime.datetime
macros.timedelta The standard lib’s datetime.timedelta
macros.dateutil A reference to the dateutil package
macros.time The standard lib’s time
macros.uuid The standard lib’s uuid
macros.random The standard lib’s random

Some airflow specific macros are also defined:

airflow.macros.ds_add(ds, days)[source]

Add or subtract days from a YYYY-MM-DD

Parameters:
  • ds (str) – anchor date in YYYY-MM-DD format to add to
  • days (int) – number of days to add to the ds, you can use negative values
>>> ds_add('2015-01-01', 5)
'2015-01-06'
>>> ds_add('2015-01-06', -5)
'2015-01-01'
airflow.macros.ds_format(ds, input_format, output_format)[source]

Takes an input string and outputs another string as specified in the output format

Parameters:
  • ds (str) – input string which contains a date
  • input_format (str) – input string format. E.g. %Y-%m-%d
  • output_format (str) – output string format E.g. %Y-%m-%d
>>> ds_format('2015-01-01', "%Y-%m-%d", "%m-%d-%y")
'01-01-15'
>>> ds_format('1/5/2015', "%m/%d/%Y",  "%Y-%m-%d")
'2015-01-05'
airflow.macros.random() → x in the interval [0, 1).
airflow.macros.hive.closest_ds_partition(table, ds, before=True, schema='default', metastore_conn_id='metastore_default')[source]

This function finds the date in a list closest to the target date. An optional parameter can be given to get the closest before or after.

Parameters:
  • table (str) – A hive table name
  • ds (list[datetime.date]) – A datestamp %Y-%m-%d e.g. yyyy-mm-dd
  • before (bool or None) – closest before (True), after (False) or either side of ds
Returns:

The closest date

Return type:

str or None

>>> tbl = 'airflow.static_babynames_partitioned'
>>> closest_ds_partition(tbl, '2015-01-02')
'2015-01-01'
airflow.macros.hive.max_partition(table, schema='default', field=None, filter_map=None, metastore_conn_id='metastore_default')[source]

Gets the max partition for a table.

Parameters:
  • schema (str) – The hive schema the table lives in
  • table (str) – The hive table you are interested in, supports the dot notation as in “my_database.my_table”, if a dot is found, the schema param is disregarded
  • metastore_conn_id (str) – The hive connection you are interested in. If your default is set you don’t need to use this parameter.
  • filter_map (map) – partition_key:partition_value map used for partition filtering, e.g. {‘key1’: ‘value1’, ‘key2’: ‘value2’}. Only partitions matching all partition_key:partition_value pairs will be considered as candidates of max partition.
  • field (str) – the field to get the max value from. If there’s only one partition field, this will be inferred
>>> max_partition('airflow.static_babynames_partitioned')
'2015-01-01'

Models

Models are built on top of the SQLAlchemy ORM Base class, and instances are persisted in the database.

class airflow.models.BaseOperator(**kwargs)[source]

Bases: airflow.utils.log.logging_mixin.LoggingMixin

Abstract base class for all operators. Since operators create objects that become nodes in the dag, BaseOperator contains many recursive methods for dag crawling behavior. To derive this class, you are expected to override the constructor as well as the ‘execute’ method.

Operators derived from this class should perform or trigger certain tasks synchronously (wait for completion). Example of operators could be an operator that runs a Pig job (PigOperator), a sensor operator that waits for a partition to land in Hive (HiveSensorOperator), or one that moves data from Hive to MySQL (Hive2MySqlOperator). Instances of these operators (tasks) target specific operations, running specific scripts, functions or data transfers.

This class is abstract and shouldn’t be instantiated. Instantiating a class derived from this one results in the creation of a task object, which ultimately becomes a node in DAG objects. Task dependencies should be set by using the set_upstream and/or set_downstream methods.

Parameters:
  • task_id (str) – a unique, meaningful id for the task
  • owner (str) – the owner of the task, using the unix username is recommended
  • retries (int) – the number of retries that should be performed before failing the task
  • retry_delay (datetime.timedelta) – delay between retries
  • retry_exponential_backoff (bool) – allow progressive longer waits between retries by using exponential backoff algorithm on retry delay (delay will be converted into seconds)
  • max_retry_delay (datetime.timedelta) – maximum delay interval between retries
  • start_date (datetime.datetime) – The start_date for the task, determines the execution_date for the first task instance. The best practice is to have the start_date rounded to your DAG’s schedule_interval. Daily jobs have their start_date some day at 00:00:00, hourly jobs have their start_date at 00:00 of a specific hour. Note that Airflow simply looks at the latest execution_date and adds the schedule_interval to determine the next execution_date. It is also very important to note that different tasks’ dependencies need to line up in time. If task A depends on task B and their start_date are offset in a way that their execution_date don’t line up, A’s dependencies will never be met. If you are looking to delay a task, for example running a daily task at 2AM, look into the TimeSensor and TimeDeltaSensor. We advise against using dynamic start_date and recommend using fixed ones. Read the FAQ entry about start_date for more information.
  • end_date (datetime.datetime) – if specified, the scheduler won’t go beyond this date
  • depends_on_past (bool) – when set to true, task instances will run sequentially while relying on the previous task’s schedule to succeed. The task instance for the start_date is allowed to run.
  • wait_for_downstream (bool) – when set to true, an instance of task X will wait for tasks immediately downstream of the previous instance of task X to finish successfully before it runs. This is useful if the different instances of a task X alter the same asset, and this asset is used by tasks downstream of task X. Note that depends_on_past is forced to True wherever wait_for_downstream is used.
  • queue (str) – which queue to target when running this job. Not all executors implement queue management, the CeleryExecutor does support targeting specific queues.
  • dag (airflow.models.DAG) – a reference to the dag the task is attached to (if any)
  • priority_weight (int) – priority weight of this task against other task. This allows the executor to trigger higher priority tasks before others when things get backed up. Set priority_weight as a higher number for more important tasks.
  • weight_rule (str) – weighting method used for the effective total priority weight of the task. Options are: { downstream | upstream | absolute } default is downstream When set to downstream the effective weight of the task is the aggregate sum of all downstream descendants. As a result, upstream tasks will have higher weight and will be scheduled more aggressively when using positive weight values. This is useful when you have multiple dag run instances and desire to have all upstream tasks to complete for all runs before each dag can continue processing downstream tasks. When set to upstream the effective weight is the aggregate sum of all upstream ancestors. This is the opposite where downtream tasks have higher weight and will be scheduled more aggressively when using positive weight values. This is useful when you have multiple dag run instances and prefer to have each dag complete before starting upstream tasks of other dags. When set to absolute, the effective weight is the exact priority_weight specified without additional weighting. You may want to do this when you know exactly what priority weight each task should have. Additionally, when set to absolute, there is bonus effect of significantly speeding up the task creation process as for very large DAGS. Options can be set as string or using the constants defined in the static class airflow.utils.WeightRule
  • pool (str) – the slot pool this task should run in, slot pools are a way to limit concurrency for certain tasks
  • sla (datetime.timedelta) – time by which the job is expected to succeed. Note that this represents the timedelta after the period is closed. For example if you set an SLA of 1 hour, the scheduler would send an email soon after 1:00AM on the 2016-01-02 if the 2016-01-01 instance has not succeeded yet. The scheduler pays special attention for jobs with an SLA and sends alert emails for sla misses. SLA misses are also recorded in the database for future reference. All tasks that share the same SLA time get bundled in a single email, sent soon after that time. SLA notification are sent once and only once for each task instance.
  • execution_timeout (datetime.timedelta) – max time allowed for the execution of this task instance, if it goes beyond it will raise and fail.
  • on_failure_callback (callable) – a function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function. Context contains references to related objects to the task instance and is documented under the macros section of the API.
  • on_retry_callback (callable) – much like the on_failure_callback except that it is executed when retries occur.
  • on_success_callback (callable) – much like the on_failure_callback except that it is executed when the task succeeds.
  • trigger_rule (str) – defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | one_success | one_failed | none_failed | dummy} default is all_success. Options can be set as string or using the constants defined in the static class airflow.utils.TriggerRule
  • resources (dict) – A map of resource parameter names (the argument names of the Resources constructor) to their values.
  • run_as_user (str) – unix username to impersonate while running the task
  • task_concurrency (int) – When set, a task will be able to limit the concurrent runs across execution_dates
  • executor_config (dict) –

    Additional task-level configuration parameters that are interpreted by a specific executor. Parameters are namespaced by the name of executor.

    Example: to run this task in a specific docker container through the KubernetesExecutor

    MyOperator(...,
        executor_config={
        "KubernetesExecutor":
            {"image": "myCustomDockerImage"}
            }
    )
    
  • do_xcom_push (bool) – if True, an XCom is pushed containing the Operator’s result
clear(**kwargs)[source]

Clears the state of task instances associated with the task, following the parameters specified.

dag

Returns the Operator’s DAG if set, otherwise raises an error

deps

Returns the list of dependencies for the operator. These differ from execution context dependencies in that they are specific to tasks and can be extended/overridden by subclasses.

downstream_list

@property: list of tasks directly downstream

execute(context)[source]

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

get_direct_relative_ids(upstream=False)[source]

Get the direct relative ids to the current task, upstream or downstream.

get_direct_relatives(upstream=False)[source]

Get the direct relatives to the current task, upstream or downstream.

get_flat_relative_ids(upstream=False, found_descendants=None)[source]

Get a flat list of relatives’ ids, either upstream or downstream.

get_flat_relatives(upstream=False)[source]

Get a flat list of relatives, either upstream or downstream.

get_task_instances(session, start_date=None, end_date=None)[source]

Get a set of task instance related to this task for a specific date range.

has_dag()[source]

Returns True if the Operator has been assigned to a DAG.

on_kill()[source]

Override this method to cleanup subprocesses when a task instance gets killed. Any use of the threading, subprocess or multiprocessing module within an operator needs to be cleaned up or it will leave ghost processes behind.

post_execute(context, *args, **kwargs)[source]

This hook is triggered right after self.execute() is called. It is passed the execution context and any results returned by the operator.

pre_execute(context, *args, **kwargs)[source]

This hook is triggered right before self.execute() is called.

prepare_template()[source]

Hook that is triggered after the templated fields get replaced by their content. If you need your operator to alter the content of the file before the template is rendered, it should override this method to do so.

render_template(attr, content, context)[source]

Renders a template either from a file or directly in a field, and returns the rendered result.

render_template_from_field(attr, content, context, jinja_env)[source]

Renders a template from a field. If the field is a string, it will simply render the string and return the result. If it is a collection or nested set of collections, it will traverse the structure and render all elements in it. If the field has another type, it will return it as it is.

run(start_date=None, end_date=None, ignore_first_depends_on_past=False, ignore_ti_state=False, mark_success=False)[source]

Run a set of task instances for a date range.

schedule_interval

The schedule interval of the DAG always wins over individual tasks so that tasks within a DAG always line up. The task still needs a schedule_interval as it may not be attached to a DAG.

set_downstream(task_or_task_list)[source]

Set a task or a task list to be directly downstream from the current task.

set_upstream(task_or_task_list)[source]

Set a task or a task list to be directly upstream from the current task.

upstream_list

@property: list of tasks directly upstream

xcom_pull(context, task_ids=None, dag_id=None, key=u'return_value', include_prior_dates=None)[source]

See TaskInstance.xcom_pull()

xcom_push(context, key, value, execution_date=None)[source]

See TaskInstance.xcom_push()

class airflow.models.DAG(dag_id, description=u'', schedule_interval=datetime.timedelta(1), start_date=None, end_date=None, full_filepath=None, template_searchpath=None, user_defined_macros=None, user_defined_filters=None, default_args=None, concurrency=16, max_active_runs=16, dagrun_timeout=None, sla_miss_callback=None, default_view=None, orientation='LR', catchup=True, on_success_callback=None, on_failure_callback=None, params=None, access_control=None)[source]

Bases: airflow.dag.base_dag.BaseDag, airflow.utils.log.logging_mixin.LoggingMixin

A dag (directed acyclic graph) is a collection of tasks with directional dependencies. A dag also has a schedule, a start date and an end date (optional). For each schedule, (say daily or hourly), the DAG needs to run each individual tasks as their dependencies are met. Certain tasks have the property of depending on their own past, meaning that they can’t run until their previous schedule (and upstream tasks) are completed.

DAGs essentially act as namespaces for tasks. A task_id can only be added once to a DAG.

Parameters:
  • dag_id (str) – The id of the DAG
  • description (str) – The description for the DAG to e.g. be shown on the webserver
  • schedule_interval (datetime.timedelta or dateutil.relativedelta.relativedelta or str that acts as a cron expression) – Defines how often that DAG runs, this timedelta object gets added to your latest task instance’s execution_date to figure out the next schedule
  • start_date (datetime.datetime) – The timestamp from which the scheduler will attempt to backfill
  • end_date (datetime.datetime) – A date beyond which your DAG won’t run, leave to None for open ended scheduling
  • template_searchpath (str or list[str]) – This list of folders (non relative) defines where jinja will look for your templates. Order matters. Note that jinja/airflow includes the path of your DAG file by default
  • user_defined_macros (dict) – a dictionary of macros that will be exposed in your jinja templates. For example, passing dict(foo='bar') to this argument allows you to {{ foo }} in all jinja templates related to this DAG. Note that you can pass any type of object here.
  • user_defined_filters (dict) – a dictionary of filters that will be exposed in your jinja templates. For example, passing dict(hello=lambda name: 'Hello %s' % name) to this argument allows you to {{ 'world' | hello }} in all jinja templates related to this DAG.
  • default_args (dict) – A dictionary of default parameters to be used as constructor keyword parameters when initialising operators. Note that operators have the same hook, and precede those defined here, meaning that if your dict contains ‘depends_on_past’: True here and ‘depends_on_past’: False in the operator’s call default_args, the actual value will be False.
  • params (dict) – a dictionary of DAG level parameters that are made accessible in templates, namespaced under params. These params can be overridden at the task level.
  • concurrency (int) – the number of task instances allowed to run concurrently
  • max_active_runs (int) – maximum number of active DAG runs, beyond this number of DAG runs in a running state, the scheduler won’t create new active DAG runs
  • dagrun_timeout (datetime.timedelta) – specify how long a DagRun should be up before timing out / failing, so that new DagRuns can be created
  • sla_miss_callback (types.FunctionType) – specify a function to call when reporting SLA timeouts.
  • default_view (str) – Specify DAG default view (tree, graph, duration, gantt, landing_times)
  • orientation (str) – Specify DAG orientation in graph view (LR, TB, RL, BT)
  • catchup (bool) – Perform scheduler catchup (or only run latest)? Defaults to True
  • on_failure_callback (callable) – A function to be called when a DagRun of this dag fails. A context dictionary is passed as a single parameter to this function.
  • on_success_callback (callable) – Much like the on_failure_callback except that it is executed when the dag succeeds.
  • access_control (dict) – Specify optional DAG-level permissions, e.g., “{‘role1’: {‘can_dag_read’}, ‘role2’: {‘can_dag_read’, ‘can_dag_edit’}}”
add_task(task)[source]

Add a task to the DAG

Parameters:task (task) – the task you want to add
add_tasks(tasks)[source]

Add a list of tasks to the DAG

Parameters:tasks (list of tasks) – a lit of tasks you want to add
clear(**kwargs)[source]

Clears a set of task instances associated with the current dag for a specified date range.

cli()[source]

Exposes a CLI specific to this DAG

concurrency_reached

Returns a boolean indicating whether the concurrency limit for this DAG has been reached

create_dagrun(**kwargs)[source]

Creates a dag run from this dag including the tasks associated with this dag. Returns the dag run.

Parameters:
  • run_id (str) – defines the the run id for this dag run
  • execution_date (datetime.datetime) – the execution date of this dag run
  • state (airflow.utils.state.State) – the state of the dag run
  • start_date (datetime) – the date this dag run should be evaluated
  • external_trigger (bool) – whether this dag run is externally triggered
  • session (sqlalchemy.orm.session.Session) – database session
static deactivate_stale_dags(*args, **kwargs)[source]

Deactivate any DAGs that were last touched by the scheduler before the expiration date. These DAGs were likely deleted.

Parameters:expiration_date (datetime) – set inactive DAGs that were touched before this time
Returns:None
static deactivate_unknown_dags(*args, **kwargs)[source]

Given a list of known DAGs, deactivate any other DAGs that are marked as active in the ORM

Parameters:active_dag_ids (list[unicode]) – list of DAG IDs that are active
Returns:None
filepath

File location of where the dag object is instantiated

folder

Folder location of where the dag object is instantiated

following_schedule(dttm)[source]

Calculates the following schedule for this dag in UTC.

Parameters:dttm – utc datetime
Returns:utc datetime
get_active_runs(**kwargs)[source]

Returns a list of dag run execution dates currently running

Parameters:session
Returns:List of execution dates
get_dagrun(**kwargs)[source]

Returns the dag run for a given execution date if it exists, otherwise none.

Parameters:
  • execution_date – The execution date of the DagRun to find.
  • session
Returns:

The DagRun if found, otherwise None.

get_default_view()[source]

This is only there for backward compatible jinja2 templates

get_num_active_runs(**kwargs)[source]

Returns the number of active “running” dag runs

Parameters:
  • external_trigger (bool) – True for externally triggered active dag runs
  • session
Returns:

number greater than 0 for active dag runs

static get_num_task_instances(*args, **kwargs)[source]

Returns the number of task instances in the given DAG.

Parameters:
  • session – ORM session
  • dag_id (unicode) – ID of the DAG to get the task concurrency of
  • task_ids (list[unicode]) – A list of valid task IDs for the given DAG
  • states (list[state]) – A list of states to filter by if supplied
Returns:

The number of running tasks

Return type:

int

get_run_dates(start_date, end_date=None)[source]

Returns a list of dates between the interval received as parameter using this dag’s schedule interval. Returned dates can be used for execution dates.

Parameters:
  • start_date (datetime) – the start date of the interval
  • end_date (datetime) – the end date of the interval, defaults to timezone.utcnow()
Returns:

a list of dates within the interval following the dag’s schedule

Return type:

list

get_template_env()[source]

Returns a jinja2 Environment while taking into account the DAGs template_searchpath, user_defined_macros and user_defined_filters

handle_callback(**kwargs)[source]

Triggers the appropriate callback depending on the value of success, namely the on_failure_callback or on_success_callback. This method gets the context of a single TaskInstance part of this DagRun and passes that to the callable along with a ‘reason’, primarily to differentiate DagRun failures.

Parameters:
  • dagrun – DagRun object
  • success – Flag to specify if failure or success callback should be called
  • reason – Completion reason
  • session – Database session
is_fixed_time_schedule()[source]

Figures out if the DAG schedule has a fixed time (e.g. 3 AM).

Returns:True if the schedule has a fixed time, False if not.
is_paused

Returns a boolean indicating whether this DAG is paused

latest_execution_date

Returns the latest date for which at least one dag run exists

normalize_schedule(dttm)[source]

Returns dttm + interval unless dttm is first interval then it returns dttm

previous_schedule(dttm)[source]

Calculates the previous schedule for this dag in UTC

Parameters:dttm – utc datetime
Returns:utc datetime
run(start_date=None, end_date=None, mark_success=False, local=False, executor=None, donot_pickle=False, ignore_task_deps=False, ignore_first_depends_on_past=False, pool=None, delay_on_limit_secs=1.0, verbose=False, conf=None, rerun_failed_tasks=False, run_backwards=False)[source]

Runs the DAG.

Parameters:
  • start_date (datetime.datetime) – the start date of the range to run
  • end_date (datetime.datetime) – the end date of the range to run
  • mark_success (bool) – True to mark jobs as succeeded without running them
  • local (bool) – True to run the tasks using the LocalExecutor
  • executor (BaseExecutor) – The executor instance to run the tasks
  • donot_pickle (bool) – True to avoid pickling DAG object and send to workers
  • ignore_task_deps (bool) – True to skip upstream tasks
  • ignore_first_depends_on_past (bool) – True to ignore depends_on_past dependencies for the first set of tasks only
  • pool (str) – Resource pool to use
  • delay_on_limit_secs (float) – Time in seconds to wait before next attempt to run dag run when max_active_runs limit has been reached
  • verbose (bool) – Make logging output more verbose
  • conf (dict) – user defined dictionary passed from CLI
  • rerun_failed_tasks
  • run_backwards
Type:

bool

Type:

bool

set_dependency(upstream_task_id, downstream_task_id)[source]

Simple utility method to set dependency between two tasks that already have been added to the DAG using add_task()

sub_dag(task_regex, include_downstream=False, include_upstream=True)[source]

Returns a subset of the current dag as a deep copy of the current dag based on a regex that should match one or many tasks, and includes upstream and downstream neighbours based on the flag passed.

subdags

Returns a list of the subdag objects associated to this DAG

sync_to_db(**kwargs)[source]

Save attributes about this DAG to the DB. Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a SubDagOperator.

Parameters:
  • dag (airflow.models.DAG) – the DAG object to save to the DB
  • sync_time (datetime) – The time that the DAG should be marked as sync’ed
Returns:

None

test_cycle()[source]

Check to see if there are any cycles in the DAG. Returns False if no cycle found, otherwise raises exception.

topological_sort()[source]

Sorts tasks in topographical order, such that a task comes after any of its upstream dependencies.

Heavily inspired by: http://blog.jupo.org/2012/04/06/topological-sorting-acyclic-directed-graphs/

Returns:list of tasks in topological order
tree_view()[source]

Shows an ascii tree representation of the DAG

class airflow.models.DagBag(dag_folder=None, executor=None, include_examples=True)[source]

Bases: airflow.dag.base_dag.BaseDagBag, airflow.utils.log.logging_mixin.LoggingMixin

A dagbag is a collection of dags, parsed out of a folder tree and has high level configuration settings, like what database to use as a backend and what executor to use to fire off tasks. This makes it easier to run distinct environments for say production and development, tests, or for different teams or security profiles. What would have been system level settings are now dagbag level so that one system can run multiple, independent settings sets.

Parameters:
  • dag_folder (unicode) – the folder to scan to find DAGs
  • executor – the executor to use when executing task instances in this DagBag
  • include_examples (bool) – whether to include the examples that ship with airflow or not
  • has_logged – an instance boolean that gets flipped from False to True after a file has been skipped. This is to prevent overloading the user with logging messages about skipped files. Therefore only once per DagBag is a file logged being skipped.
bag_dag(dag, parent_dag, root_dag)[source]

Adds the DAG into the bag, recurses into sub dags. Throws AirflowDagCycleException if a cycle is detected in this dag or its subdags

collect_dags(dag_folder=None, only_if_updated=True, include_examples=True)[source]

Given a file path or a folder, this method looks for python modules, imports them and adds them to the dagbag collection.

Note that if a .airflowignore file is found while processing the directory, it will behave much like a .gitignore, ignoring files that match any of the regex patterns specified in the file.

Note: The patterns in .airflowignore are treated as un-anchored regexes, not shell-like glob patterns.

dagbag_report()[source]

Prints a report around DagBag loading stats

get_dag(dag_id)[source]

Gets the DAG out of the dictionary, and refreshes it if expired

kill_zombies(**kwargs)[source]

Fail given zombie tasks, which are tasks that haven’t had a heartbeat for too long, in the current DagBag.

Parameters:
  • zombies (airflow.utils.dag_processing.SimpleTaskInstance) – zombie task instances to kill.
  • session (sqlalchemy.orm.session.Session) – DB session.
process_file(filepath, only_if_updated=True, safe_mode=True)[source]

Given a path to a python module or zip file, this method imports the module and look for dag objects within it.

size()[source]
Returns:the amount of dags contained in this dagbag
class airflow.models.DagModel(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

create_dagrun(**kwargs)[source]

Creates a dag run from this dag including the tasks associated with this dag. Returns the dag run.

Parameters:
  • run_id (str) – defines the the run id for this dag run
  • execution_date (datetime.datetime) – the execution date of this dag run
  • state (airflow.utils.state.State) – the state of the dag run
  • start_date (datetime.datetime) – the date this dag run should be evaluated
  • external_trigger (bool) – whether this dag run is externally triggered
  • session (sqlalchemy.orm.session.Session) – database session
class airflow.models.DagRun(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base, airflow.utils.log.logging_mixin.LoggingMixin

DagRun describes an instance of a Dag. It can be created by the scheduler (for regular runs) or by an external trigger

static find(*args, **kwargs)[source]

Returns a set of dag runs for the given search criteria.

Parameters:
  • dag_id (int, list) – the dag_id to find dag runs for
  • run_id (str) – defines the the run id for this dag run
  • execution_date (datetime.datetime) – the execution date
  • state (airflow.utils.state.State) – the state of the dag run
  • external_trigger (bool) – whether this dag run is externally triggered
  • no_backfills (bool) – return no backfills (True), return all (False). Defaults to False
  • session (sqlalchemy.orm.session.Session) – database session
get_dag()[source]

Returns the Dag associated with this DagRun.

Returns:DAG
classmethod get_latest_runs(**kwargs)[source]

Returns the latest DagRun for each DAG.

get_previous_dagrun(**kwargs)[source]

The previous DagRun, if there is one

get_previous_scheduled_dagrun(**kwargs)[source]

The previous, SCHEDULED DagRun, if there is one

static get_run(session, dag_id, execution_date)[source]
Parameters:
  • dag_id (unicode) – DAG ID
  • execution_date (datetime) – execution date
Returns:

DagRun corresponding to the given dag_id and execution date if one exists. None otherwise.

Return type:

DagRun

get_task_instance(**kwargs)[source]

Returns the task instance specified by task_id for this dag run

Parameters:task_id – the task id
get_task_instances(**kwargs)[source]

Returns the task instances for this dag run

refresh_from_db(**kwargs)[source]

Reloads the current dagrun from the database :param session: database session

update_state(**kwargs)[source]

Determines the overall state of the DagRun based on the state of its TaskInstances.

Returns:State
verify_integrity(**kwargs)[source]

Verifies the DagRun by checking for removed tasks or tasks that are not in the database yet. It will set state to removed or add the task if required.

exception airflow.models.InvalidFernetToken[source]

Bases: exceptions.Exception

class airflow.models.NullFernet[source]

Bases: future.types.newobject.newobject

A “Null” encryptor class that doesn’t encrypt or decrypt but that presents a similar interface to Fernet.

The purpose of this is to make the rest of the code not have to know the difference, and to only display the message once, not 20 times when airflow initdb is ran.

class airflow.models.Pool(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

open_slots(**kwargs)[source]

Returns the number of slots open at the moment

queued_slots(**kwargs)[source]

Returns the number of slots used at the moment

used_slots(**kwargs)[source]

Returns the number of slots used at the moment

class airflow.models.TaskInstance(task, execution_date, state=None)[source]

Bases: sqlalchemy.ext.declarative.api.Base, airflow.utils.log.logging_mixin.LoggingMixin

Task instances store the state of a task instance. This table is the authority and single source of truth around what tasks have run and the state they are in.

The SqlAlchemy model doesn’t have a SqlAlchemy foreign key to the task or dag model deliberately to have more control over transactions.

Database transactions on this table should insure double triggers and any confusion around what task instances are or aren’t ready to run even while multiple schedulers may be firing task instances.

are_dependencies_met(**kwargs)[source]

Returns whether or not all the conditions are met for this task instance to be run given the context for the dependencies (e.g. a task instance being force run from the UI will ignore some dependencies).

Parameters:
  • dep_context (DepContext) – The execution context that determines the dependencies that should be evaluated.
  • session (sqlalchemy.orm.session.Session) – database session
  • verbose (bool) – whether log details on failed dependencies on info or debug log level
are_dependents_done(**kwargs)[source]

Checks whether the dependents of this task instance have all succeeded. This is meant to be used by wait_for_downstream.

This is useful when you do not want to start processing the next schedule of a task until the dependents are done. For instance, if the task DROPs and recreates a table.

clear_xcom_data(**kwargs)[source]

Clears all XCom data from the database for the task instance

command(mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, ignore_task_deps=False, ignore_ti_state=False, local=False, pickle_id=None, raw=False, job_id=None, pool=None, cfg_path=None)[source]

Returns a command that can be executed anywhere where airflow is installed. This command is part of the message sent to executors by the orchestrator.

command_as_list(mark_success=False, ignore_all_deps=False, ignore_task_deps=False, ignore_depends_on_past=False, ignore_ti_state=False, local=False, pickle_id=None, raw=False, job_id=None, pool=None, cfg_path=None)[source]

Returns a command that can be executed anywhere where airflow is installed. This command is part of the message sent to executors by the orchestrator.

current_state(**kwargs)[source]

Get the very latest state from the database, if a session is passed, we use and looking up the state becomes part of the session, otherwise a new session is used.

error(**kwargs)[source]

Forces the task instance’s state to FAILED in the database.

static generate_command(dag_id, task_id, execution_date, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, ignore_task_deps=False, ignore_ti_state=False, local=False, pickle_id=None, file_path=None, raw=False, job_id=None, pool=None, cfg_path=None)[source]

Generates the shell command required to execute this task instance.

Parameters:
  • dag_id (unicode) – DAG ID
  • task_id (unicode) – Task ID
  • execution_date (datetime) – Execution date for the task
  • mark_success (bool) – Whether to mark the task as successful
  • ignore_all_deps (bool) – Ignore all ignorable dependencies. Overrides the other ignore_* parameters.
  • ignore_depends_on_past (bool) – Ignore depends_on_past parameter of DAGs (e.g. for Backfills)
  • ignore_task_deps (bool) – Ignore task-specific dependencies such as depends_on_past and trigger rule
  • ignore_ti_state (bool) – Ignore the task instance’s previous failure/success
  • local (bool) – Whether to run the task locally
  • pickle_id (unicode) – If the DAG was serialized to the DB, the ID associated with the pickled DAG
  • file_path – path to the file containing the DAG definition
  • raw – raw mode (needs more details)
  • job_id – job ID (needs more details)
  • pool (unicode) – the Airflow pool that the task should run in
  • cfg_path (basestring) – the Path to the configuration file
Returns:

shell command that can be used to run the task instance

get_dagrun(**kwargs)[source]

Returns the DagRun for this TaskInstance

Parameters:session
Returns:DagRun
init_on_load()[source]

Initialize the attributes that aren’t stored in the DB.

init_run_context(raw=False)[source]

Sets the log context.

is_eligible_to_retry()[source]

Is task instance is eligible for retry

is_premature

Returns whether a task is in UP_FOR_RETRY state and its retry interval has elapsed.

key

Returns a tuple that identifies the task instance uniquely

next_retry_datetime()[source]

Get datetime of the next retry