Using Operators

An operator represents a single, ideally idempotent, task. Operators determine what actually executes when your DAG runs.

See the Operators Concepts documentation and the Operators API Reference for more information.

BashOperator

Use the BashOperator to execute commands in a Bash shell.

run_this = BashOperator(
    task_id='run_after_loop',
    bash_command='echo 1',
    dag=dag,
)

Templating

You can use Jinja templates to parameterize the bash_command argument.

also_run_this = BashOperator(
    task_id='also_run_this',
    bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
    dag=dag,
)

Troubleshooting

Jinja template not found

Add a space after the script name when directly calling a Bash script with the bash_command argument. This is because Airflow tries to apply a Jinja template to it, which will fail.

t2 = BashOperator(
    task_id='bash_example',

    # This fails with `Jinja template not found` error
    # bash_command="/home/batcher/test.sh",

    # This works (has a space after)
    bash_command="/home/batcher/test.sh ",
    dag=dag)

PythonOperator

Use the PythonOperator to execute Python callables.

def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'


run_this = PythonOperator(
    task_id='print_the_context',
    provide_context=True,
    python_callable=print_context,
    dag=dag,
)

Passing in arguments

Use the op_args and op_kwargs arguments to pass additional arguments to the Python callable.

def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)


# Generate 10 sleeping tasks, sleeping from 0 to 4 seconds respectively
for i in range(5):
    task = PythonOperator(
        task_id='sleep_for_' + str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i) / 10},
        dag=dag,
    )

    run_this >> task

Templating

When you set the provide_context argument to True, Airflow passes in an additional set of keyword arguments: one for each of the Jinja template variables and a templates_dict argument.

The templates_dict argument is templated, so each value in the dictionary is evaluated as a Jinja template.

Google Cloud Storage Operators

GoogleCloudStorageToBigQueryOperator

Use the GoogleCloudStorageToBigQueryOperator to execute a BigQuery load job.

load_csv = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq_example',
    bucket='cloud-samples-data',
    source_objects=['bigquery/us-states/us-states.csv'],
    destination_project_dataset_table='airflow_test.gcs_to_bq_table',
    schema_fields=[
        {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
    ],
    write_disposition='WRITE_TRUNCATE',
    dag=dag)

Google Compute Engine Operators

GceInstanceStartOperator

Use the GceInstanceStartOperator to start an existing Google Compute Engine instance.

Arguments

The following examples of OS environment variables used to pass arguments to the operator:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
ZONE = os.environ.get('ZONE', 'europe-west1-b')
INSTANCE = os.environ.get('INSTANCE', 'testinstance')

Using the operator

gce_instance_start = GceInstanceStartOperator(
    project_id=PROJECT_ID,
    zone=ZONE,
    resource_id=INSTANCE,
    task_id='gcp_compute_start_task'
)

Templating

template_fields = ('project_id', 'zone', 'resource_id', 'gcp_conn_id', 'api_version')

GceInstanceStopOperator

Use the operator to stop Google Compute Engine instance.

For parameter definition take a look at GceInstanceStopOperator

Arguments

The following examples of OS environment variables used to pass arguments to the operator:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
ZONE = os.environ.get('ZONE', 'europe-west1-b')
INSTANCE = os.environ.get('INSTANCE', 'testinstance')

Using the operator

gce_instance_stop = GceInstanceStopOperator(
    project_id=PROJECT_ID,
    zone=ZONE,
    resource_id=INSTANCE,
    task_id='gcp_compute_stop_task'
)

Templating

template_fields = ('project_id', 'zone', 'resource_id', 'gcp_conn_id', 'api_version')

GceSetMachineTypeOperator

Use the operator to change machine type of a Google Compute Engine instance.

For parameter definition take a look at GceSetMachineTypeOperator

Arguments

The following examples of OS environment variables used to pass arguments to the operator:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
ZONE = os.environ.get('ZONE', 'europe-west1-b')
INSTANCE = os.environ.get('INSTANCE', 'testinstance')
SHORT_MACHINE_TYPE_NAME = os.environ.get('SHORT_MACHINE_TYPE_NAME', 'n1-standard-1')
SET_MACHINE_TYPE_BODY = {
    'machineType': 'zones/{}/machineTypes/{}'.format(ZONE, SHORT_MACHINE_TYPE_NAME)
}

Using the operator

gce_set_machine_type = GceSetMachineTypeOperator(
    project_id=PROJECT_ID,
    zone=ZONE,
    resource_id=INSTANCE,
    body=SET_MACHINE_TYPE_BODY,
    task_id='gcp_compute_set_machine_type'
)

Templating

template_fields = ('project_id', 'zone', 'resource_id', 'gcp_conn_id', 'api_version')

GceInstanceTemplateCopyOperator

Use the operator to copy an existing Google Compute Engine instance template applying a patch to it.

For parameter definition take a look at GceInstanceTemplateCopyOperator.

Arguments

The following examples of OS environment variables used to pass arguments to the operator:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
ZONE = os.environ.get('ZONE', 'europe-west1-b')
TEMPLATE_NAME = os.environ.get('TEMPLATE_NAME', 'instance-template-test')
NEW_TEMPLATE_NAME = os.environ.get('NEW_TEMPLATE_NAME',
                                   'instance-template-test-new')
NEW_DESCRIPTION = os.environ.get('NEW_DESCRIPTION', 'Test new description')
GCE_INSTANCE_TEMPLATE_BODY_UPDATE = {
    "name": NEW_TEMPLATE_NAME,
    "description": NEW_DESCRIPTION,
    "properties": {
        "machineType": "n1-standard-2"
    }
}

Using the operator

gce_instance_template_copy = GceInstanceTemplateCopyOperator(
    project_id=PROJECT_ID,
    resource_id=TEMPLATE_NAME,
    body_patch=GCE_INSTANCE_TEMPLATE_BODY_UPDATE,
    task_id='gcp_compute_igm_copy_template_task'
)

Templating

template_fields = ('project_id', 'resource_id', 'request_id',
                   'gcp_conn_id', 'api_version')

GceInstanceGroupManagerUpdateTemplateOperator

Use the operator to update template in Google Compute Engine Instance Group Manager.

For parameter definition take a look at GceInstanceGroupManagerUpdateTemplateOperator.

Arguments

The following examples of OS environment variables used to pass arguments to the operator:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
ZONE = os.environ.get('ZONE', 'europe-west1-b')
INSTANCE_GROUP_MANAGER_NAME = os.environ.get('INSTANCE_GROUP_MANAGER_NAME',
                                             'instance-group-test')

SOURCE_TEMPLATE_URL = os.environ.get(
    'SOURCE_TEMPLATE_URL',
    "https://www.googleapis.com/compute/beta/projects/"
    "example-project/global/instanceTemplates/instance-template-test")

DESTINATION_TEMPLATE_URL = os.environ.get(
    'DESTINATION_TEMPLATE_URL',
    "https://www.googleapis.com/compute/beta/projects/"
    "example-airflow/global/instanceTemplates/" + NEW_TEMPLATE_NAME)

UPDATE_POLICY = {
    "type": "OPPORTUNISTIC",
    "minimalAction": "RESTART",
    "maxSurge": {
        "fixed": 1
    },
    "minReadySec": 1800
}

Using the operator

gce_instance_group_manager_update_template = \
    GceInstanceGroupManagerUpdateTemplateOperator(
        project_id=PROJECT_ID,
        resource_id=INSTANCE_GROUP_MANAGER_NAME,
        zone=ZONE,
        source_template=SOURCE_TEMPLATE_URL,
        destination_template=DESTINATION_TEMPLATE_URL,
        update_policy=UPDATE_POLICY,
        task_id='gcp_compute_igm_group_manager_update_template'
    )

Templating

template_fields = ('project_id', 'resource_id', 'zone', 'request_id',
                   'source_template', 'destination_template',
                   'gcp_conn_id', 'api_version')

Troubleshooting

You might find that your GceInstanceGroupManagerUpdateTemplateOperator fails with missing permissions. The service account has to have Service Account User role assigned via IAM permissions in order to execute the operation.

Google Cloud Functions Operators

GcfFunctionDeleteOperator

Use the operator to delete a function from Google Cloud Functions.

For parameter definition take a look at GcfFunctionDeleteOperator.

Arguments

The following examples of OS environment variables show how you can build function name to use in the operator:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
LOCATION = os.environ.get('LOCATION', 'europe-west1')
ENTRYPOINT = os.environ.get('ENTRYPOINT', 'helloWorld')
# A fully-qualified name of the function to delete

FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
                                                               ENTRYPOINT)

Using the operator

t1 = GcfFunctionDeleteOperator(
    task_id="gcf_delete_task",
    name=FUNCTION_NAME
)

Templating

template_fields = ('name', 'gcp_conn_id', 'api_version')

Troubleshooting

If you want to run or deploy an operator using a service account and get “forbidden 403” errors, it means that your service account does not have the correct Cloud IAM permissions.

  1. Assign your Service Account the Cloud Functions Developer role.
  2. Grant the user the Cloud IAM Service Account User role on the Cloud Functions runtime service account.

The typical way of assigning Cloud IAM permissions with gcloud is shown below. Just replace PROJECT_ID with ID of your Google Cloud Platform project and SERVICE_ACCOUNT_EMAIL with the email ID of your service account.

gcloud iam service-accounts add-iam-policy-binding \
  PROJECT_ID@appspot.gserviceaccount.com \
  --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
  --role="roles/iam.serviceAccountUser"

See Adding the IAM service agent user role to the runtime service for details

GcfFunctionDeployOperator

Use the operator to deploy a function to Google Cloud Functions.

For parameter definition take a look at GcfFunctionDeployOperator.

Arguments

The following examples of OS environment variables show several variants of args you can use with the operator:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
LOCATION = os.environ.get('LOCATION', 'europe-west1')
SOURCE_ARCHIVE_URL = os.environ.get('SOURCE_ARCHIVE_URL', '')
SOURCE_UPLOAD_URL = os.environ.get('SOURCE_UPLOAD_URL', '')
SOURCE_REPOSITORY = os.environ.get(
    'SOURCE_REPOSITORY',
    'https://source.developers.google.com/'
    'projects/example-project/repos/hello-world/moveable-aliases/master')
ZIP_PATH = os.environ.get('ZIP_PATH', '')
ENTRYPOINT = os.environ.get('ENTRYPOINT', 'helloWorld')
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
                                                               ENTRYPOINT)
RUNTIME = 'nodejs6'
VALIDATE_BODY = os.environ.get('VALIDATE_BODY', True)

With those variables you can define the body of the request:

body = {
    "name": FUNCTION_NAME,
    "entryPoint": ENTRYPOINT,
    "runtime": RUNTIME,
    "httpsTrigger": {}
}

When you create a DAG, the default_args dictionary can be used to pass arguments common with other tasks:

default_args = {
    'start_date': dates.days_ago(1)
}

Note that the neither the body nor the default args are complete in the above examples. Depending on the variables set, there might be different variants on how to pass source code related fields. Currently, you can pass either sourceArchiveUrl, sourceRepository or sourceUploadUrl as described in the Cloud Functions API specification.

Additionally, default_args or direct operator args might contain zip_path parameter to run the extra step of uploading the source code before deploying it. In this case, you also need to provide an empty sourceUploadUrl parameter in the body.

Using the operator

Based on the variables defined above, example logic of setting the source code related fields is shown here:

if SOURCE_ARCHIVE_URL:
    body['sourceArchiveUrl'] = SOURCE_ARCHIVE_URL
elif SOURCE_REPOSITORY:
    body['sourceRepository'] = {
        'url': SOURCE_REPOSITORY
    }
elif ZIP_PATH:
    body['sourceUploadUrl'] = ''
    default_args['zip_path'] = ZIP_PATH
elif SOURCE_UPLOAD_URL:
    body['sourceUploadUrl'] = SOURCE_UPLOAD_URL
else:
    raise Exception("Please provide one of the source_code parameters")

The code to create the operator:

deploy_task = GcfFunctionDeployOperator(
    task_id="gcf_deploy_task",
    name=FUNCTION_NAME,
    project_id=PROJECT_ID,
    location=LOCATION,
    body=body,
    validate_body=VALIDATE_BODY
)

Templating

template_fields = ('project_id', 'location', 'gcp_conn_id', 'api_version')

Troubleshooting

If you want to run or deploy an operator using a service account and get “forbidden 403” errors, it means that your service account does not have the correct Cloud IAM permissions.

  1. Assign your Service Account the Cloud Functions Developer role.
  2. Grant the user the Cloud IAM Service Account User role on the Cloud Functions runtime service account.

The typical way of assigning Cloud IAM permissions with gcloud is shown below. Just replace PROJECT_ID with ID of your Google Cloud Platform project and SERVICE_ACCOUNT_EMAIL with the email ID of your service account.

gcloud iam service-accounts add-iam-policy-binding \
  PROJECT_ID@appspot.gserviceaccount.com \
  --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
  --role="roles/iam.serviceAccountUser"

See Adding the IAM service agent user role to the runtime service for details

If the source code for your function is in Google Source Repository, make sure that your service account has the Source Repository Viewer role so that the source code can be downloaded if necessary.

Google Cloud Sql Operators

CloudSqlInstanceDatabaseCreateOperator

Creates a new database inside a Cloud SQL instance.

For parameter definition take a look at CloudSqlInstanceDatabaseCreateOperator.

Arguments

Some arguments in the example DAG are taken from environment variables:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testpostgres')
DB_NAME = os.environ.get('DB_NAME', 'testdb')

Using the operator

sql_db_create_task = CloudSqlInstanceDatabaseCreateOperator(
    project_id=PROJECT_ID,
    body=db_create_body,
    instance=INSTANCE_NAME,
    task_id='sql_db_create_task'
)

Example request body:

db_create_body = {
    "instance": INSTANCE_NAME,
    "name": DB_NAME,
    "project": PROJECT_ID
}

Templating

template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')

CloudSqlInstanceDatabaseDeleteOperator

Deletes a database from a Cloud SQL instance.

For parameter definition take a look at CloudSqlInstanceDatabaseDeleteOperator.

Arguments

Some arguments in the example DAG are taken from environment variables:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testpostgres')
DB_NAME = os.environ.get('DB_NAME', 'testdb')

Using the operator

sql_db_delete_task = CloudSqlInstanceDatabaseDeleteOperator(
    project_id=PROJECT_ID,
    instance=INSTANCE_NAME,
    database=DB_NAME,
    task_id='sql_db_delete_task'
)

Templating

template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id',
                   'api_version')

CloudSqlInstanceDatabasePatchOperator

Updates a resource containing information about a database inside a Cloud SQL instance using patch semantics. See: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch

For parameter definition take a look at CloudSqlInstanceDatabasePatchOperator.

Arguments

Some arguments in the example DAG are taken from environment variables:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testpostgres')
DB_NAME = os.environ.get('DB_NAME', 'testdb')

Using the operator

sql_db_patch_task = CloudSqlInstanceDatabasePatchOperator(
    project_id=PROJECT_ID,
    body=db_patch_body,
    instance=INSTANCE_NAME,
    database=DB_NAME,
    task_id='sql_db_patch_task'
)

Example request body:

db_patch_body = {
    "charset": "utf16",
    "collation": "utf16_general_ci"
}

Templating

template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id',
                   'api_version')

CloudSqlInstanceDeleteOperator

Deletes a Cloud SQL instance in Google Cloud Platform.

For parameter definition take a look at CloudSqlInstanceDeleteOperator.

Arguments

Some arguments in the example DAG are taken from OS environment variables:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testpostgres')
DB_NAME = os.environ.get('DB_NAME', 'testdb')

Using the operator

sql_instance_delete_task = CloudSqlInstanceDeleteOperator(
    project_id=PROJECT_ID,
    instance=INSTANCE_NAME,
    task_id='sql_instance_delete_task'
)

Templating

template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')

CloudSqlInstanceCreateOperator

Creates a new Cloud SQL instance in Google Cloud Platform.

For parameter definition take a look at CloudSqlInstanceCreateOperator.

If an instance with the same name exists, no action will be taken and the operator will succeed.

Arguments

Some arguments in the example DAG are taken from OS environment variables:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testpostgres')
DB_NAME = os.environ.get('DB_NAME', 'testdb')

Example body defining the instance:

body = {
    "name": INSTANCE_NAME,
    "settings": {
        "tier": "db-n1-standard-1",
        "backupConfiguration": {
            "binaryLogEnabled": True,
            "enabled": True,
            "startTime": "05:00"
        },
        "activationPolicy": "ALWAYS",
        "dataDiskSizeGb": 30,
        "dataDiskType": "PD_SSD",
        "databaseFlags": [],
        "ipConfiguration": {
            "ipv4Enabled": True,
            "requireSsl": True,
        },
        "locationPreference": {
            "zone": "europe-west4-a"
        },
        "maintenanceWindow": {
            "hour": 5,
            "day": 7,
            "updateTrack": "canary"
        },
        "pricingPlan": "PER_USE",
        "replicationType": "ASYNCHRONOUS",
        "storageAutoResize": False,
        "storageAutoResizeLimit": 0,
        "userLabels": {
            "my-key": "my-value"
        }
    },
    "databaseVersion": "MYSQL_5_7",
    "region": "europe-west4",
}

Using the operator

sql_instance_create_task = CloudSqlInstanceCreateOperator(
    project_id=PROJECT_ID,
    body=body,
    instance=INSTANCE_NAME,
    task_id='sql_instance_create_task'
)

Templating

template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')

CloudSqlInstancePatchOperator

Updates settings of a Cloud SQL instance in Google Cloud Platform (partial update).

For parameter definition take a look at CloudSqlInstancePatchOperator.

This is a partial update, so only values for the settings specified in the body will be set / updated. The rest of the existing instance’s configuration will remain unchanged.

Arguments

Some arguments in the example DAG are taken from OS environment variables:

PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testpostgres')
DB_NAME = os.environ.get('DB_NAME', 'testdb')

Example body defining the instance:

patch_body = {
    "name": INSTANCE_NAME,
    "settings": {
        "dataDiskSizeGb": 35,
        "maintenanceWindow": {
            "hour": 3,
            "day": 6,
            "updateTrack": "canary"
        },
        "userLabels": {
            "my-key-patch": "my-value-patch"
        }
    }
}

Using the operator

sql_instance_patch_task = CloudSqlInstancePatchOperator(
    project_id=PROJECT_ID,
    body=patch_body,
    instance=INSTANCE_NAME,
    task_id='sql_instance_patch_task'
)

Templating

template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')

CloudSqlQueryOperator

Performs DDL or DML SQL queries in Google Cloud SQL instance. The DQL (retrieving data from Google Cloud SQL) is not supported - you might run the SELECT queries but results of those queries are discarded.

You can specify various connectivity methods to connect to running instance - starting from public IP plain connection through public IP with SSL or both TCP and socket connection via Cloud Sql Proxy. The proxy is downloaded and started/stopped dynamically as needed by the operator.

There is a gcpcloudsql:// connection type that you should use to define what kind of connectivity you want the operator to use. The connection is a “meta” type of connection. It is not used to make an actual connectivity on its own, but it determines whether Cloud Sql Proxy should be started by CloudSqlDatabaseHook and what kind of the database connection (Postgres or MySQL) should be created dynamically - to either connect to Cloud SQL via public IP address or via the proxy. The ‘CloudSqlDatabaseHook` uses CloudSqlProxyRunner to manage Cloud Sql Proxy lifecycle (each task has its own Cloud Sql Proxy)

When you build connection, you should use connection parameters as described in CloudSqlDatabaseHook. You can see examples of connections below for all the possible types of connectivity. Such connection can be reused between different tasks (instances of CloudSqlQueryOperator) - each task will get their own proxy started if needed with their own TCP or UNIX socket.

For parameter definition take a look at CloudSqlQueryOperator.

Since query operator can run arbitrary query - it cannot be guaranteed to be idempotent. SQL query designer should design the queries to be idempotent. For example both Postgres and MySql support CREATE TABLE IF NOT EXISTS statements that can be used to create tables in an idempotent way.

Arguments

If you define connection via AIRFLOW_CONN_* URL defined in an environment variable, make sure the URL components in the URL are URL-encoded. See examples below for details.

Note that in case of SSL connections you need to have a mechanism to make the certificate/key files available in predefined locations for all the workers on which the operator can run. This can be provided for example by mounting NFS-like volumes in the same path for all the workers.

Some arguments in the example DAG are taken from the OS environment variables:


PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
LOCATION = os.environ.get('REGION', 'europe-west-1')

POSTGRES_INSTANCE_NAME = os.environ.get('POSTGRES_INSTANCE_NAME', 'testpostgres')
POSTGRES_DATABASE_NAME = os.environ.get('POSTGRES_DATABASE_NAME', 'postgresdb')
POSTGRES_USER = os.environ.get('POSTGRES_USER', 'postgres_user')
POSTGRES_PASSWORD = os.environ.get('POSTGRES_PASSWORD', 'password')
POSTGRES_PUBLIC_IP = os.environ.get('POSTGRES_PUBLIC_IP', '0.0.0.0')
POSTGRES_PUBLIC_PORT = os.environ.get('POSTGRES_PUBLIC_PORT', 5432)
POSTGRES_CLIENT_CERT_FILE = os.environ.get('POSTGRES_CLIENT_CERT_FILE',
                                           "/tmp/client-cert.pem")
POSTGRES_CLIENT_KEY_FILE = os.environ.get('POSTGRES_CLIENT_KEY_FILE',
                                          "/tmp/client-key.pem")
POSTGRES_SERVER_CA_FILE = os.environ.get('POSTGRES_SERVER_CA_FILE',
                                         "/tmp/server-ca.pem")

MYSQL_INSTANCE_NAME = os.environ.get('MYSQL_INSTANCE_NAME', 'testmysql')
MYSQL_DATABASE_NAME = os.environ.get('MYSQL_DATABASE_NAME', 'mysqldb')
MYSQL_USER = os.environ.get('MYSQL_USER', 'mysql_user')
MYSQL_PASSWORD = os.environ.get('MYSQL_PASSWORD', 'password')
MYSQL_PUBLIC_IP = os.environ.get('MYSQL_PUBLIC_IP', '0.0.0.0')
MYSQL_PUBLIC_PORT = os.environ.get('MYSQL_PUBLIC_PORT', 3306)
MYSQL_CLIENT_CERT_FILE = os.environ.get('MYSQL_CLIENT_CERT_FILE',
                                        "/tmp/client-cert.pem")
MYSQL_CLIENT_KEY_FILE = os.environ.get('MYSQL_CLIENT_KEY_FILE',
                                       "/tmp/client-key.pem")
MYSQL_SERVER_CA_FILE = os.environ.get('MYSQL_SERVER_CA_FILE',
                                      "/tmp/server-ca.pem")

SQL = [
    'CREATE TABLE IF NOT EXISTS TABLE_TEST (I INTEGER)',
    'CREATE TABLE IF NOT EXISTS TABLE_TEST (I INTEGER)',  # shows warnings logged
    'INSERT INTO TABLE_TEST VALUES (0)',
    'CREATE TABLE IF NOT EXISTS TABLE_TEST2 (I INTEGER)',
    'DROP TABLE TABLE_TEST',
    'DROP TABLE TABLE_TEST2',
]

Example connection definitions for all connectivity cases. Note that all the components of the connection URI should be URL-encoded:


postgres_kwargs = dict(
    user=quote_plus(POSTGRES_USER),
    password=quote_plus(POSTGRES_PASSWORD),
    public_port=POSTGRES_PUBLIC_PORT,
    public_ip=quote_plus(POSTGRES_PUBLIC_IP),
    project_id=quote_plus(PROJECT_ID),
    location=quote_plus(LOCATION),
    instance=quote_plus(POSTGRES_INSTANCE_NAME),
    database=quote_plus(POSTGRES_DATABASE_NAME),
    client_cert_file=quote_plus(POSTGRES_CLIENT_CERT_FILE),
    client_key_file=quote_plus(POSTGRES_CLIENT_KEY_FILE),
    server_ca_file=quote_plus(POSTGRES_SERVER_CA_FILE)
)

# The connections below are created using one of the standard approaches - via environment
# variables named AIRFLOW_CONN_* . The connections can also be created in the database
# of AIRFLOW (using command line or UI).

# Postgres: connect via proxy over TCP
os.environ['AIRFLOW_CONN_PROXY_POSTGRES_TCP'] = \
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
    "database_type=postgres&" \
    "project_id={project_id}&" \
    "location={location}&" \
    "instance={instance}&" \
    "use_proxy=True&" \
    "sql_proxy_use_tcp=True".format(**postgres_kwargs)

# Postgres: connect via proxy over UNIX socket (specific proxy version)
os.environ['AIRFLOW_CONN_PROXY_POSTGRES_SOCKET'] = \
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
    "database_type=postgres&" \
    "project_id={project_id}&" \
    "location={location}&" \
    "instance={instance}&" \
    "use_proxy=True&" \
    "sql_proxy_version=v1.13&" \
    "sql_proxy_use_tcp=False".format(**postgres_kwargs)

# Postgres: connect directly via TCP (non-SSL)
os.environ['AIRFLOW_CONN_PUBLIC_POSTGRES_TCP'] = \
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
    "database_type=postgres&" \
    "project_id={project_id}&" \
    "location={location}&" \
    "instance={instance}&" \
    "use_proxy=False&" \
    "use_ssl=False".format(**postgres_kwargs)

# Postgres: connect directly via TCP (SSL)
os.environ['AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL'] = \
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
    "database_type=postgres&" \
    "project_id={project_id}&" \
    "location={location}&" \
    "instance={instance}&" \
    "use_proxy=False&" \
    "use_ssl=True&" \
    "sslcert={client_cert_file}&" \
    "sslkey={client_key_file}&" \
    "sslrootcert={server_ca_file}"\
    .format(**postgres_kwargs)

mysql_kwargs = dict(
    user=quote_plus(MYSQL_USER),
    password=quote_plus(MYSQL_PASSWORD),
    public_port=MYSQL_PUBLIC_PORT,
    public_ip=quote_plus(MYSQL_PUBLIC_IP),
    project_id=quote_plus(PROJECT_ID),
    location=quote_plus(LOCATION),
    instance=quote_plus(MYSQL_INSTANCE_NAME),
    database=quote_plus(MYSQL_DATABASE_NAME),
    client_cert_file=quote_plus(MYSQL_CLIENT_CERT_FILE),
    client_key_file=quote_plus(MYSQL_CLIENT_KEY_FILE),
    server_ca_file=quote_plus(MYSQL_SERVER_CA_FILE)
)

# MySQL: connect via proxy over TCP (specific proxy version)
os.environ['AIRFLOW_CONN_PROXY_MYSQL_TCP'] = \
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
    "database_type=mysql&" \
    "project_id={project_id}&" \
    "location={location}&" \
    "instance={instance}&" \
    "use_proxy=True&" \
    "sql_proxy_version=v1.13&" \
    "sql_proxy_use_tcp=True".format(**mysql_kwargs)

# MySQL: connect via proxy over UNIX socket using pre-downloaded Cloud Sql Proxy binary
try:
    sql_proxy_binary_path = subprocess.check_output(
        ['which', 'cloud_sql_proxy']).rstrip()
except subprocess.CalledProcessError:
    sql_proxy_binary_path = "/tmp/anyhow_download_cloud_sql_proxy"

os.environ['AIRFLOW_CONN_PROXY_MYSQL_SOCKET'] = \
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
    "database_type=mysql&" \
    "project_id={project_id}&" \
    "location={location}&" \
    "instance={instance}&" \
    "use_proxy=True&" \
    "sql_proxy_binary_path={sql_proxy_binary_path}&" \
    "sql_proxy_use_tcp=False".format(
        sql_proxy_binary_path=quote_plus(sql_proxy_binary_path), **mysql_kwargs)

# MySQL: connect directly via TCP (non-SSL)
os.environ['AIRFLOW_CONN_PUBLIC_MYSQL_TCP'] = \
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
    "database_type=mysql&" \
    "project_id={project_id}&" \
    "location={location}&" \
    "instance={instance}&" \
    "use_proxy=False&" \
    "use_ssl=False".format(**mysql_kwargs)

# MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql Proxy binary path
os.environ['AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL'] = \
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
    "database_type=mysql&" \
    "project_id={project_id}&" \
    "location={location}&" \
    "instance={instance}&" \
    "use_proxy=False&" \
    "use_ssl=True&" \
    "sslcert={client_cert_file}&" \
    "sslkey={client_key_file}&" \
    "sslrootcert={server_ca_file}".format(**mysql_kwargs)

Using the operator

Example operators below are using all connectivity options (note connection id from the operator matches the AIRFLOW_CONN_* postfix uppercase - this is standard AIRFLOW notation for defining connection via environment variables):


connection_names = [
    "proxy_postgres_tcp",
    "proxy_postgres_socket",
    "public_postgres_tcp",
    "public_postgres_tcp_ssl",
    "proxy_mysql_tcp",
    "proxy_mysql_socket",
    "public_mysql_tcp",
    "public_mysql_tcp_ssl"
]

tasks = []

with models.DAG(
    dag_id='example_gcp_sql_query',
    default_args=default_args,
    schedule_interval=None
) as dag:
    for connection_name in connection_names:
        tasks.append(
            CloudSqlQueryOperator(
                gcp_cloudsql_conn_id=connection_name,
                task_id="example_gcp_sql_task_" + connection_name,
                sql=SQL
            )
        )

Templating

template_fields = ('sql', 'gcp_cloudsql_conn_id', 'gcp_conn_id')
template_ext = ('.sql',)

More information

See Google Cloud Sql Proxy documentation for details about Cloud Sql Proxy.

Google Cloud Storage Operators

GoogleCloudStorageBucketCreateAclEntryOperator

Creates a new ACL entry on the specified bucket.

For parameter definition take a look at GoogleCloudStorageBucketCreateAclEntryOperator

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

GCS_ACL_BUCKET = os.environ.get('GCS_ACL_BUCKET', 'example-bucket')
GCS_ACL_OBJECT = os.environ.get('GCS_ACL_OBJECT', 'example-object')
GCS_ACL_ENTITY = os.environ.get('GCS_ACL_ENTITY', 'example-entity')
GCS_ACL_BUCKET_ROLE = os.environ.get('GCS_ACL_BUCKET_ROLE', 'example-bucket-role')
GCS_ACL_OBJECT_ROLE = os.environ.get('GCS_ACL_OBJECT_ROLE', 'example-object-role')

Using the operator

gcs_bucket_create_acl_entry_task = GoogleCloudStorageBucketCreateAclEntryOperator(
    bucket=GCS_ACL_BUCKET,
    entity=GCS_ACL_ENTITY,
    role=GCS_ACL_BUCKET_ROLE,
    task_id="gcs_bucket_create_acl_entry_task"
)

Templating

template_fields = ('bucket', 'entity', 'role', 'user_project')

GoogleCloudStorageObjectCreateAclEntryOperator

Creates a new ACL entry on the specified object.

For parameter definition take a look at GoogleCloudStorageObjectCreateAclEntryOperator

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

GCS_ACL_BUCKET = os.environ.get('GCS_ACL_BUCKET', 'example-bucket')
GCS_ACL_OBJECT = os.environ.get('GCS_ACL_OBJECT', 'example-object')
GCS_ACL_ENTITY = os.environ.get('GCS_ACL_ENTITY', 'example-entity')
GCS_ACL_BUCKET_ROLE = os.environ.get('GCS_ACL_BUCKET_ROLE', 'example-bucket-role')
GCS_ACL_OBJECT_ROLE = os.environ.get('GCS_ACL_OBJECT_ROLE', 'example-object-role')

Using the operator

gcs_object_create_acl_entry_task = GoogleCloudStorageObjectCreateAclEntryOperator(
    bucket=GCS_ACL_BUCKET,
    object_name=GCS_ACL_OBJECT,
    entity=GCS_ACL_ENTITY,
    role=GCS_ACL_OBJECT_ROLE,
    task_id="gcs_object_create_acl_entry_task"
)

Templating

template_fields = ('bucket', 'object_name', 'entity', 'role', 'generation',
                   'user_project')