Integration

AWS: Amazon Webservices

GCP: Google Cloud Platform

Airflow has extensive support for the Google Cloud Platform. But note that most Hooks and Operators are in the contrib section. Meaning that they have a beta status, meaning that they can have breaking changes between minor releases.

BigQuery

BigQuery Operators

BigQueryCheckOperator
class airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator(sql, bigquery_conn_id='bigquery_default', *args, **kwargs)[source]

Performs checks against Presto. The BigQueryCheckOperator expects a sql query that will return a single row. Each value on that first row is evaluated using python bool casting. If any of the values return False the check is failed and errors out.

Note that Python bool casting evals the following as False:

  • False
  • 0
  • Empty string ("")
  • Empty list ([])
  • Empty dictionary or set ({})

Given a query like SELECT COUNT(*) FROM foo, it will fail only if the count == 0. You can craft much more complex query that could, for instance, check that the table has the same number of rows as the source table upstream, or that the count of today’s partition is greater than yesterday’s partition, or that a set of metrics are less than 3 standard deviation for the 7 day average.

This operator can be used as a data quality check in your pipeline, and depending on where you put it in your DAG, you have the choice to stop the critical path, preventing from publishing dubious data, or on the side and receive email alterts without stopping the progress of the DAG.

Parameters:
  • sql (string) – the sql to be executed
  • bigquery_conn_id – reference to the BigQuery database
BigQueryValueCheckOperator
class airflow.contrib.operators.bigquery_check_operator.BigQueryValueCheckOperator(sql, pass_value, tolerance=None, bigquery_conn_id='bigquery_default', *args, **kwargs)[source]

Performs a simple value check using sql code.

Parameters:sql (string) – the sql to be executed
BigQueryIntervalCheckOperator
class airflow.contrib.operators.bigquery_check_operator.BigQueryIntervalCheckOperator(table, metrics_thresholds, date_filter_column='ds', days_back=-7, bigquery_conn_id='bigquery_default', *args, **kwargs)[source]

Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before.

This method constructs a query like so:

SELECT {metrics_threshold_dict_key} FROM {table}
WHERE {date_filter_column}=<date>
Parameters:
  • table (str) – the table name
  • days_back (int) – number of days between ds and the ds we want to check against. Defaults to 7 days
  • metrics_threshold (dict) – a dictionary of ratios indexed by metrics, for example ‘COUNT(*)’: 1.5 would require a 50 percent or less difference between the current day, and the prior days_back.
BigQueryOperator
class airflow.contrib.operators.bigquery_operator.BigQueryOperator(bql, destination_dataset_table=False, write_disposition='WRITE_EMPTY', allow_large_results=False, bigquery_conn_id='bigquery_default', delegate_to=None, udf_config=False, use_legacy_sql=True, *args, **kwargs)[source]

Executes BigQuery SQL queries in a specific BigQuery database

BigQueryToBigQueryOperator
class airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator(source_project_dataset_tables, destination_project_dataset_table, write_disposition='WRITE_EMPTY', create_disposition='CREATE_IF_NEEDED', bigquery_conn_id='bigquery_default', delegate_to=None, *args, **kwargs)[source]

Copy a BigQuery table to another BigQuery table.

BigQueryToCloudStorageOperator
class airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator(source_project_dataset_table, destination_cloud_storage_uris, compression='NONE', export_format='CSV', field_delimiter=', ', print_header=True, bigquery_conn_id='bigquery_default', delegate_to=None, *args, **kwargs)[source]

Transfers a BigQuery table to a Google Cloud Storage bucket.

BigQueryHook

class airflow.contrib.hooks.bigquery_hook.BigQueryHook(bigquery_conn_id='bigquery_default', delegate_to=None)[source]

Interact with BigQuery. This hook uses the Google Cloud Platform connection.

get_conn()[source]

Returns a BigQuery PEP 249 connection object.

get_pandas_df(bql, parameters=None)[source]

Returns a Pandas DataFrame for the results produced by a BigQuery query. The DbApiHook method must be overridden because Pandas doesn’t support PEP 249 connections, except for SQLite. See:

https://github.com/pydata/pandas/blob/master/pandas/io/sql.py#L447 https://github.com/pydata/pandas/issues/6900

Parameters:bql (string) – The BigQuery SQL to execute.
get_service()[source]

Returns a BigQuery service object.

insert_rows(table, rows, target_fields=None, commit_every=1000)[source]

Insertion is currently unsupported. Theoretically, you could use BigQuery’s streaming API to insert rows into a table, but this hasn’t been implemented.

table_exists(project_id, dataset_id, table_id)[source]

Checks for the existence of a table in Google BigQuery.

Parameters:project_id – The Google cloud project in which to look for the table. The connection supplied to the hook

must provide access to the specified project. :type project_id: string :param dataset_id: The name of the dataset in which to look for the table.

storage bucket.
Parameters:table_id (string) – The name of the table to check the existence of.

Cloud DataFlow

DataFlow Operators

DataFlowJavaOperator
class airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator(jar, dataflow_default_options=None, options=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Start a Java Cloud DataFlow batch job. The parameters of the operation will be passed to the job.

It’s a good practice to define dataflow_* parameters in the default_args of the dag like the project, zone and staging location.

``` default_args = {

‘dataflow_default_options’: {
‘project’: ‘my-gcp-project’, ‘zone’: ‘europe-west1-d’, ‘stagingLocation’: ‘gs://my-staging-bucket/staging/’

}

You need to pass the path to your dataflow as a file reference with the jar parameter, the jar needs to be a self executing jar. Use options to pass on options to your job.

``` t1 = DataFlowOperation(

task_id=’datapflow_example’, jar=’{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar’, options={

‘autoscalingAlgorithm’: ‘BASIC’, ‘maxNumWorkers’: ‘50’, ‘start’: ‘{{ds}}’, ‘partitionType’: ‘DAY’

}, dag=my-dag)

```

Both jar and options are templated so you can use variables in them.

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date':
        (2016, 8, 1),
    'email': ['alex@vanboxel.be'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=30),
    'dataflow_default_options': {
        'project': 'my-gcp-project',
        'zone': 'us-central1-f',
        'stagingLocation': 'gs://bucket/tmp/dataflow/staging/',
    }
}

dag = DAG('test-dag', default_args=default_args)

task = DataFlowJavaOperator(
    gcp_conn_id='gcp_default',
    task_id='normalize-cal',
    jar='{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar',
    options={
        'autoscalingAlgorithm': 'BASIC',
        'maxNumWorkers': '50',
        'start': '{{ds}}',
        'partitionType': 'DAY'

    },
    dag=dag)

DataFlowHook

class airflow.contrib.hooks.gcp_dataflow_hook.DataFlowHook(gcp_conn_id='google_cloud_default', delegate_to=None)[source]
get_conn()[source]

Returns a Google Cloud Storage service object.

Cloud DataProc

DataProc Operators

DataProcPigOperator
class airflow.contrib.operators.dataproc_operator.DataProcPigOperator(query=None, query_uri=None, variables=None, job_name='{{task.task_id}}_{{ds_nodash}}', dataproc_cluster='cluster-1', dataproc_pig_properties=None, dataproc_pig_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Start a Pig query Job on a Cloud DataProc cluster. The parameters of the operation will be passed to the cluster.

It’s a good practice to define dataproc_* parameters in the default_args of the dag like the cluster name and UDFs.

``` default_args = {

‘dataproc_cluster’: ‘cluster-1’, ‘dataproc_pig_jars’: [

‘gs://example/udf/jar/datafu/1.2.0/datafu.jar’, ‘gs://example/udf/jar/gpig/1.2/gpig.jar’

]

You can pass a pig script as string or file reference. Use variables to pass on variables for the pig script to be resolved on the cluster or use the parameters to be resolved in the script as template parameters.

``` t1 = DataProcPigOperator(

task_id=’dataproc_pig’, query=’a_pig_script.pig’, variables={‘out’: ‘gs://example/output/{{ds}}’},

dag=dag) ```

DataProcHiveOperator
class airflow.contrib.operators.dataproc_operator.DataProcHiveOperator(query, variables=None, job_name='{{task.task_id}}_{{ds_nodash}}', dataproc_cluster='cluster-1', dataproc_hive_properties=None, dataproc_hive_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Start a Hive query Job on a Cloud DataProc cluster.

DataProcSparkSqlOperator
class airflow.contrib.operators.dataproc_operator.DataProcSparkSqlOperator(query, variables=None, job_name='{{task.task_id}}_{{ds_nodash}}', dataproc_cluster='cluster-1', dataproc_spark_properties=None, dataproc_spark_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Start a Spark SQL query Job on a Cloud DataProc cluster.

DataProcSparkOperator
class airflow.contrib.operators.dataproc_operator.DataProcSparkOperator(main_jar=None, main_class=None, arguments=None, archives=None, files=None, job_name='{{task.task_id}}_{{ds_nodash}}', dataproc_cluster='cluster-1', dataproc_spark_properties=None, dataproc_spark_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Start a Spark Job on a Cloud DataProc cluster.

DataProcHadoopOperator
class airflow.contrib.operators.dataproc_operator.DataProcHadoopOperator(main_jar=None, main_class=None, arguments=None, archives=None, files=None, job_name='{{task.task_id}}_{{ds_nodash}}', dataproc_cluster='cluster-1', dataproc_hadoop_properties=None, dataproc_hadoop_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Start a Hadoop Job on a Cloud DataProc cluster.

DataProcPySparkOperator

class airflow.contrib.operators.dataproc_operator.DataProcPySparkOperator(main, arguments=None, archives=None, pyfiles=None, files=None, job_name='{{task.task_id}}_{{ds_nodash}}', dataproc_cluster='cluster-1', dataproc_pyspark_properties=None, dataproc_pyspark_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Start a PySpark Job on a Cloud DataProc cluster.

Cloud Datastore

Datastore Operators

class airflow.contrib.hooks.datastore_hook.DatastoreHook(datastore_conn_id='google_cloud_datastore_default', delegate_to=None)[source]

Interact with Google Cloud Datastore. This hook uses the Google Cloud Platform connection.

This object is not threads safe. If you want to make multiple requests simultaniously, you will need to create a hook per thread.

allocate_ids(partialKeys)[source]

Allocate IDs for incomplete keys. see https://cloud.google.com/datastore/docs/apis/v1beta2/datasets/allocateIds

Parameters:partialKeys – a list of partial keys
Returns:a list of full keys.
begin_transaction()[source]

Get a new transaction handle see https://cloud.google.com/datastore/docs/apis/v1beta2/datasets/beginTransaction

Returns:a transaction handle
commit(body)[source]

Commit a transaction, optionally creating, deleting or modifying some entities. see https://cloud.google.com/datastore/docs/apis/v1beta2/datasets/commit

Parameters:body – the body of the commit request
Returns:the response body of the commit request
get_conn()[source]

Returns a Google Cloud Storage service object.

lookup(keys, read_consistency=None, transaction=None)[source]

Lookup some entities by key see https://cloud.google.com/datastore/docs/apis/v1beta2/datasets/lookup :param keys: the keys to lookup :param read_consistency: the read consistency to use. default, strong or eventual.

Cannot be used with a transaction.
Parameters:transaction – the transaction to use, if any.
Returns:the response body of the lookup request.
rollback(transaction)[source]

Roll back a transaction see https://cloud.google.com/datastore/docs/apis/v1beta2/datasets/rollback :param transaction: the transaction to roll back

run_query(body)[source]

Run a query for entities. see https://cloud.google.com/datastore/docs/apis/v1beta2/datasets/runQuery :param body: the body of the query request :return: the batch of query results.

Cloud Storage

Storage Operators

GoogleCloudStorageDownloadOperator
class airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator(bucket, object, filename=False, store_to_xcom_key=False, google_cloud_storage_conn_id='google_cloud_storage_default', delegate_to=None, *args, **kwargs)[source]

Downloads a file from Google Cloud Storage.

GoogleCloudStorageToBigQueryOperator
class airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator(bucket, source_objects, destination_project_dataset_table, schema_fields=None, schema_object=None, source_format='CSV', create_disposition='CREATE_IF_NEEDED', skip_leading_rows=0, write_disposition='WRITE_EMPTY', field_delimiter=', ', max_id_key=None, bigquery_conn_id='bigquery_default', google_cloud_storage_conn_id='google_cloud_storage_default', delegate_to=None, schema_update_options=(), *args, **kwargs)[source]

Loads files from Google cloud storage into BigQuery.

GoogleCloudStorageHook

class airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook(google_cloud_storage_conn_id='google_cloud_storage_default', delegate_to=None)[source]

Interact with Google Cloud Storage. This hook uses the Google Cloud Platform connection.

download(bucket, object, filename=False)[source]

Get a file from Google Cloud Storage.

Parameters:
  • bucket (string) – The bucket to fetch from.
  • object (string) – The object to fetch.
  • filename (string) – If set, a local file path where the file should be written to.
exists(bucket, object)[source]

Checks for the existence of a file in Google Cloud Storage.

Parameters:
  • bucket (string) – The Google cloud storage bucket where the object is.
  • object (string) – The name of the object to check in the Google cloud storage bucket.
get_conn()[source]

Returns a Google Cloud Storage service object.

is_updated_after(bucket, object, ts)[source]

Checks if an object is updated in Google Cloud Storage.

Parameters:
  • bucket (string) – The Google cloud storage bucket where the object is.
  • object (string) – The name of the object to check in the Google cloud storage bucket.
  • ts (datetime) – The timestamp to check against.
upload(bucket, object, filename, mime_type='application/octet-stream')[source]

Uploads a local file to Google Cloud Storage.

Parameters:
  • bucket (string) – The bucket to upload to.
  • object (string) – The object name to set when uploading the local file.
  • filename (string) – The local file path to the file to be uploaded.
  • mime_type (string) – The MIME type to set when uploading the file.