Airflow
1.10.13
  • Project
  • License
  • Quick Start
  • Installation
  • Tutorial
  • How-to Guides
    • Setting Configuration Options
    • Initializing a Database Backend
    • Using Operators
    • Customizing state colours in UI
    • Creating a custom Operator
    • Managing Connections
    • Securing Connections
    • Rotating encryption keys
    • Writing Logs
    • Running Airflow behind a reverse proxy
    • Running Airflow with systemd
    • Running Airflow with upstart
    • Using the Test Mode Configuration
    • Checking Airflow Health Status
    • Define an operator extra link
      • Add or override Links to Existing Operators
    • Email Configuration
    • Tracking User Activity
    • Alternative secrets backend
  • UI / Screenshots
  • Concepts
  • Data Profiling
  • Scheduler
  • Executor
  • DAG Runs
  • Plugins
  • Security
  • Time zones
  • Using the CLI
  • Integration
  • Metrics
  • Error Tracking
  • Kubernetes
  • Lineage
  • DAG Serialization
  • Backport Providers
  • Changelog
  • Best Practices
  • Production Deployment
  • FAQ
  • Privacy Notice

References

  • CLI
  • Macros
  • Python API
  • Configurations
  • REST API
Airflow
  • »
  • How-to Guides »
  • Define an operator extra link
  • Edit on GitHub

Define an operator extra link¶

For each operator, you can define its own extra links that can redirect users to external systems. The extra link buttons will be available on the task page:

../_images/operator_extra_link.png

The following code shows how to add extra links to an operator:

Note

In order for extra links to be rendered you must be using the RBAC UI, NOT the Flask UI.

from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
from airflow.utils.decorators import apply_defaults


class GoogleLink(BaseOperatorLink):
    name = 'Google'

    def get_link(self, operator, dttm):
        return "https://www.google.com"

class MyFirstOperator(BaseOperator):

    operator_extra_links = (
        GoogleLink(),
    )

    @apply_defaults
    def __init__(self, *args, **kwargs):
        super(MyFirstOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        self.log.info("Hello World!")

You can also add a global operator extra link that will be available to all the operators through an airflow plugin. Learn more about it in the plugin example.

Add or override Links to Existing Operators¶

You can also add (or override) an extra link to an existing operators through an Airflow plugin.

For example, the following Airflow plugin will add an Operator Link on all tasks using GoogleCloudStorageToS3Operator operator.

Adding Operator Links to Existing Operators plugins/extra_link.py:

from airflow.plugins_manager import AirflowPlugin
from airflow.models.baseoperator import BaseOperatorLink
from airflow.operators.gcs_to_s3 import GoogleCloudStorageToS3Operator

class S3LogLink(BaseOperatorLink):
    name = 'S3'

    # Add list of all the operators to which you want to add this OperatorLinks
    # Example: operators = [GoogleCloudStorageToS3Operator, GoogleCloudStorageToBigQueryOperator]
    operators = [GoogleCloudStorageToS3Operator]

    def get_link(self, operator, dttm):
        return 'https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{execution_date}'.format(
            dag_id=operator.dag_id,
            task_id=operator.task_id,
            execution_date=dttm,
        )

# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
    name = "extra_link_plugin"
    operator_extra_links = [S3LogLink(), ]

Overriding Operator Links of Existing Operators:

It is also possible to replace a built in link on an operator via a Plugin. For example BigQueryOperator includes a link to the GCP console, but if we wanted to change that link we could:

from airflow.plugins_manager import AirflowPlugin
from airflow.models.baseoperator import BaseOperatorLink
from airflow.gcp.operators.bigquery import BigQueryOperator

# Change from https to http just to display the override
BIGQUERY_JOB_DETAILS_LINK_FMT = 'http://console.cloud.google.com/bigquery?j={job_id}'


class BigQueryConsoleLink(BaseOperatorLink):
    """
    Helper class for constructing BigQuery link.
    """
    name = 'BigQuery Console'
    operators = [BigQueryOperator]

    def get_link(self, operator, dttm):
        ti = TaskInstance(task=operator, execution_date=dttm)
        job_id = ti.xcom_pull(task_ids=operator.task_id, key='job_id')
        return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ''

# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
    name = "extra_link_plugin"
    operator_extra_links = [BigQueryConsoleLink(), ]
Next Previous

Revision 5b61c21a.

Built with Sphinx using a theme provided by Read the Docs.
This page uses Google Analytics to collect statistics. You can disable it by blocking the JavaScript coming from www.google-analytics.com. Check our Privacy Policy for more details.
Read the Docs v: 1.10.13
Versions
latest
stable
1.10.13
1.10.12
1.10.11
1.10.10
1.10.9
1.10.8
1.10.7
1.10.6
1.10.5
1.10.4
1.10.3post1
1.10.2
1.10.1
1.10.0
1.9.0
Downloads
On Read the Docs
Project Home
Builds