Apache Airflow
It is simple to add Modzy to your programmatic workflows in Airflow to process incoming data with AI and ML models.
Apache Airflow is an open-source, Python-based tool used to create, monitor, and schedule workflows. Airflow’s scalability, extensibility, community, and integration with DevOps tools has made it a leading platform for data scientists for building and orchestrating data ingestion, transformation and processing workflows.
If you are new to Airflow, follow the installation directions, using an image with Python 3.8 or higher (Airflow defaults to 3.6), such as image apache/airflow:2.1.2-python3.8
.
A DAG (Directed Acyclic Graph) is the central concept in Airflow to organize and execute tasks. Below is a simple template DAG to call the Modzy Python SDK within a workflow via a PythonOperator
. Once the PythonOperator initiates, the Modzy SDK executes a specified data analysis process, in this example a Sentiment Analysis model. (Note: While the PythonOperator is generally the simplest, there are other ways of making an API call to Modzy in Airflow. We also recommend adding the Modzy API as a connection and using the SimpleHttpOperator).
Here we use a simple class to retrieve the sentiment analysis results, this could easily be an import of a more advanced function. Note that Airflow tasks will operate asynchronously, returning success
as soon as any response is received from the API. The use of block_until_complete
function in the Modzy SDK will mitigate this if downstream tasks require job results. Otherwise we can simply return the job_id
and manage downstream dependencies in another way.
Save this file into your DAG folder as modzy_example.py
, and test it out using the Airflow functions:
airflow tasks list modzy_example --tree
airflow tasks test modzy_example sentiment_analysis 2021-01-01
We hope this simple example can be used and expanded to create more complex Airflow + Modzy integrations.
from datetime import timedelta
from textwrap import dedent
# The DAG object to instantiate a DAG
from airflow import DAG
# Modzy SDK
from modzy import ApiClient
API_URL = "https://app.modzy.com/api"
API_KEY = "<your.key>"
client = ApiClient(base_url=API_URL, api_key=API_KEY)
# Airflow Operators
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
# These args are passed to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'modzy',
'depends_on_past': False, #if set to True, later tasks will not run unless the previous run was successful
'email': ['[email protected]'],
'email_on_failure': True,
'retries': 0, #do not try to rerun failed tasks
}
with DAG(
'modzy_example',
default_args=default_args,
description='A simple example DAG for using Modzy with Airflow',
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
tags=['example'],
) as dag:
t1 = PythonOperator(
task_id='sentiment_analysis',
python_callable=sentiment_analysis,
op_kwargs={'client': client, 'inputs':'This is the amazing statement for analysis. Cool, neat, stupendous!'}
)
t1.doc_md = dedent(
"""\
#### Task Documentation
This is the markdown documentation of the process
"""
)
dag.doc_md = """
This is documentation for the DAG itself
"""
#simple class to get the sentiment analysis from Modzy
def sentiment_analysis(input_text, client):
inputs = {'airflow_input': {'input.txt': input_text}}
job = client.jobs.submit_text_bulk('ed542963de', '1.0.1', inputs)
results = client.results.block_until_complete(job, timeout=None)
return results
Updated almost 2 years ago