Google Cloud Transfer Service Operators

Prerequisite Tasks

To use these operators, you must do a few things:

GcpTransferServiceJobCreateOperator

Create a transfer job.

The function accepts dates in two formats:

  • consistent with Google API

    { "year": 2019, "month": 2, "day": 11 }
    
  • as an datetime object

The function accepts time in two formats:

  • consistent with Google API

    { "hours": 12, "minutes": 30, "seconds": 0 }
    
  • as an time object

If you want to create a job transfer that copies data from AWS S3 then you must have a connection configured. Information about configuration for AWS is available: Amazon Web Services Connection The selected connection for AWS can be indicated by the parameter aws_conn_id.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

gcs_to_gcs_transfer_body = {
    DESCRIPTION: GCP_DESCRIPTION,
    STATUS: GcpTransferJobsStatus.ENABLED,
    PROJECT_ID: GCP_PROJECT_ID,
    SCHEDULE: {
        SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
        SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
        START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(),
    },
    TRANSFER_SPEC: {
        GCS_DATA_SOURCE: {BUCKET_NAME: GCP_TRANSFER_FIRST_TARGET_BUCKET},
        GCS_DATA_SINK: {BUCKET_NAME: GCP_TRANSFER_SECOND_TARGET_BUCKET},
        TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
    },
}  # type: Dict[str, Any]

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

aws_to_gcs_transfer_body = {
    DESCRIPTION: GCP_DESCRIPTION,
    STATUS: GcpTransferJobsStatus.ENABLED,
    PROJECT_ID: GCP_PROJECT_ID,
    SCHEDULE: {
        SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
        SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
        START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(),
    },
    TRANSFER_SPEC: {
        AWS_S3_DATA_SOURCE: {BUCKET_NAME: GCP_TRANSFER_SOURCE_AWS_BUCKET},
        GCS_DATA_SINK: {BUCKET_NAME: GCP_TRANSFER_FIRST_TARGET_BUCKET},
        TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
    },
}

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

create_transfer_job_from_aws = GcpTransferServiceJobCreateOperator(
    task_id="create_transfer_job_from_aws", body=aws_to_gcs_transfer_body
)

Templating

template_fields = ('body', 'gcp_conn_id', 'aws_conn_id')

GcpTransferServiceJobDeleteOperator

Deletes a transfer job.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

delete_transfer_from_aws_job = GcpTransferServiceJobDeleteOperator(
    task_id="delete_transfer_from_aws_job",
    job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
    project_id=GCP_PROJECT_ID,
)

Templating

template_fields = ('job_name', 'project_id', 'gcp_conn_id', 'api_version')

GcpTransferServiceJobUpdateOperator

Updates a transfer job.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

update_body = {
    PROJECT_ID: GCP_PROJECT_ID,
    TRANSFER_JOB: {DESCRIPTION: "{}_updated".format(GCP_DESCRIPTION)},
    TRANSFER_JOB_FIELD_MASK: "description",
}

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

update_job = GcpTransferServiceJobUpdateOperator(
    task_id="update_job",
    job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
    body=update_body,
)

Templating

template_fields = ('job_name', 'body', 'gcp_conn_id', 'aws_conn_id')

GcpTransferServiceOperationCancelOperator

Gets a transfer operation. The result is returned to XCOM.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

cancel_operation = GcpTransferServiceOperationCancelOperator(
    task_id="cancel_operation",
    operation_name="{{task_instance.xcom_pull("
    "'wait_for_second_operation_to_start', key='sensed_operations')[0]['name']}}",
)

Templating

template_fields = ('operation_name', 'gcp_conn_id', 'api_version')

GcpTransferServiceOperationGetOperator

Gets a transfer operation. The result is returned to XCOM.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

get_operation = GcpTransferServiceOperationGetOperator(
    task_id="get_operation", operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}"
)

Templating

template_fields = ('operation_name', 'gcp_conn_id')

GcpTransferServiceOperationsListOperator

List a transfer operations. The result is returned to XCOM.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

list_operations = GcpTransferServiceOperationsListOperator(
    task_id="list_operations",
    filter={
        FILTER_PROJECT_ID: GCP_PROJECT_ID,
        FILTER_JOB_NAMES: ["{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}"],
    },
)

Templating

template_fields = ('filter', 'gcp_conn_id')

GcpTransferServiceOperationPauseOperator

Pauses a transfer operations.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

pause_operation = GcpTransferServiceOperationPauseOperator(
    task_id="pause_operation",
    operation_name="{{task_instance.xcom_pull('wait_for_operation_to_start', "
    "key='sensed_operations')[0]['name']}}",
)

Templating

template_fields = ('operation_name', 'gcp_conn_id', 'api_version')

GcpTransferServiceOperationResumeOperator

Resumes a transfer operations.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

resume_operation = GcpTransferServiceOperationResumeOperator(
    task_id="resume_operation", operation_name="{{task_instance.xcom_pull('get_operation')['name']}}"
)

Templating

template_fields = ('operation_name', 'gcp_conn_id', 'api_version')

GCPTransferServiceWaitForJobStatusSensor

Waits for at least one operation belonging to the job to have the expected status.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

wait_for_operation_to_end = GCPTransferServiceWaitForJobStatusSensor(
    task_id="wait_for_operation_to_end",
    job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
    project_id=GCP_PROJECT_ID,
    expected_statuses={GcpTransferOperationStatus.SUCCESS},
    poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
)

Templating

template_fields = ('job_name',)