tencent cloud

Data Lake Compute

Release Notes
Product Introduction
Overview
Strengths
Use Cases
Purchase Guide
Billing Overview
Refund
Payment Overdue
Configuration Adjustment Fees
Getting Started
Complete Process for New User Activation
DLC Data Import Guide
Quick Start with Data Analytics in Data Lake Compute
Quick Start with Permission Management in Data Lake Compute
Quick Start with Partition Table
Enabling Data Optimization
Cross-Source Analysis of EMR Hive Data
Standard Engine Configuration Guide
Configuring Data Access Policy
Operation Guide
Console Operation Introduction
Development Guide
Runtime Environment
SparkJar Job Development Guide
PySpark Job Development Guide
Query Performance Optimization Guide
UDF Function Development Guide
System Restraints
Client Access
JDBC Access
TDLC Command Line Interface Tool Access
Third-party Software Linkage
Python Access
Practical Tutorial
Accessing DLC Data with Power BI
Table Creation Practice
Using Apache Airflow to Schedule DLC Engine to Submit Tasks
Direct Query of DLC Internal Storage with StarRocks
Spark cost optimization practice
DATA + AI
Using DLC to Analyze CLS Logs
Using Role SSO to Access DLC
Resource-Level Authentication Guide
Implementing Tencent Cloud TCHouse-D Read and Write Operations in DLC
DLC Native Table
SQL Statement
SuperSQL Statement
Overview of Standard Spark Statement
Overview of Standard Presto Statement
Reserved Words
API Documentation
History
Introduction
API Category
Making API Requests
Data Table APIs
Task APIs
Metadata APIs
Service Configuration APIs
Permission Management APIs
Database APIs
Data Source Connection APIs
Data Optimization APIs
Data Engine APIs
Resource Group for the Standard Engine APIs
Data Types
Error Codes
General Reference
Error Codes
Quotas and limits
Operation Guide on Connecting Third-Party Software to DLC
FAQs
FAQs on Permissions
FAQs on Engines
FAQs on Features
FAQs on Spark Jobs
DLC Policy
Privacy Policy
Data Privacy And Security Agreement
Service Level Agreement
Contact Us
DocumentationData Lake ComputePractical TutorialUsing Apache Airflow to Schedule DLC Engine to Submit Tasks

Using Apache Airflow to Schedule DLC Engine to Submit Tasks

PDF
Focus Mode
Font Size
Last updated: 2025-05-22 15:45:02
This document introduces DLC's support for the Apache Airflow scheduling tool and provides examples demonstrating how to use Apache Airflow to run different types of DLC engine tasks.

Overview

Apache Airflow is an open-source scheduling tool developed by Airbnb, written in Python. It defines and schedules a set of interdependent tasks using Directed Acyclic Graphs (DAGs). Apache Airflow supports sub-tasks written in Python and offers various operators to execute tasks, such as Bash commands, Python functions, SQL queries, and Spark jobs, providing high flexibility and scalability. Widely used in fields like data engineering, data processing, and workflow automation, Apache Airflow allows users to easily monitor and manage the state and execution of workflows through its rich features and visual interface. For more information about Apache Airflow, see Apache Airflow.

Prerequisites

1. Prepare the Apache Airflow environment.
2. Install and start Apache Airflow. For detailed steps on installation and startup, see Quick Start.
3. Install the jaydebeapi dependency package, pip install jaydebeapi.
4. Prepare the DLC environment.
5. Enable the DLC engine service.
6. If using the standard Spark engine, prepare the Hive JDBC driver. Click to download hive-jdbc-3.1.2-standalone.jar.
7. If using the standard Presto engine, prepare the Presto JDBC driver. Click to download presto-jdbc-0.284.jar.
8. If using the SuperSQL engine, prepare the DLC JDBC driver. Click to download the JDBC driver.

Key Steps

Creating Connection and Scheduling Tasks

In the Apache Airflow working directory, create a dags directory. Inside the dags directory, create a scheduling script and save it as a .py file. For example, in this document, the scheduling script is created as /root/airflow/dags/airflow-dlc-test.py as shown below:
import time
from datetime import datetime, timedelta

import jaydebeapi
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

jdbc_url='jdbc:dlc:dlc.tencentcloudapi.com?task_type=SparkSQLTask&database_name={dataBaseName}&datasource_connection_name={dataSourceName}&region={region}&data_engine_name={engineName}'
user = 'xxx'
pwd = 'xxx'
dirver = 'com.tencent.cloud.dlc.jdbc.DlcDriver'
jar_file = '/root/airflow/jars/dlc-jdbc-2.5.3-jar-with-dependencies.jar'

def createTable():
sqlStr = 'create table if not exists db.tb1 (c1 int, c2 string)'
conn = jaydebeapi.connect(dirver, jdbc_url, [user, pwd], jar_file)
curs = conn.cursor()
curs.execute(sqlStr)
rows = curs.rowcount.real
if rows != 0:
result = curs.fetchall()
print(result)
curs.close()
conn.close()


def insertValues():
sqlStr = "insert into db.tb1 values (111, 'this is test')"
conn = jaydebeapi.connect(dirver,jdbc_url, [user, pwd], jar_file)
curs = conn.cursor()
curs.execute(sqlStr)
rows = curs.rowcount.real
if rows != 0:
result = curs.fetchall()
print(result)
curs.close()
conn.close()


def selectColums():
sqlStr = 'select * from db.tb1'
conn = jaydebeapi.connect(dirver, jdbc_url, [user, pwd], jar_file)
curs = conn.cursor()
curs.execute(sqlStr)
rows = curs.rowcount.real
if rows != 0:
result = curs.fetchall()
print(result)
curs.close()
conn.close()


def get_time():
print('Current time is:', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
return time.time()


default_args = {
'owner': 'tencent', # owner's name
'start_date': datetime(2024, 11, 1), # the first execution start time, in UTC
'retries': 2, # number of retry attempts on failure
'retry_delay': timedelta(minutes=1), # retry interval on failure
}
dag = DAG(
dag_id='airflow_dlc_test', # DAG ID, should consist only of letters, numbers, and underscores
default_args=default_args, # externally defined parameters in dic format
schedule_interval=timedelta(minutes=1), # defines the frequency at which the DAG runs; can be configured for days, weeks, hours, minutes, seconds, or milliseconds
catchup=False # when executing the DAG, all tasks scheduled from the start time up to the present will be executed. The default value is True.
)

t1 = PythonOperator(
task_id='create_table',
python_callable=createTable,
dag=dag)

t2 = PythonOperator(
task_id='insert_values',
python_callable=insertValues,
dag=dag)

t3 = PythonOperator(
task_id='select_values',
python_callable=selectColums,
dag=dag)

t4 = PythonOperator(
task_id='print_time',
python_callable=get_time,
dag=dag)

t1 >> t2 >> [t3, t4]


Parameter description:
Parameters
Description
jdbc_url
JDBC connection address and configuration parameters. For more details, see Hive JDBC Access, Presto JDBC Access, and DLC JDBC Access.
user
SecretId
pwd
SecretKey
dirver
Load the JDBC driver. For more details, see Hive JDBC Access, Presto JDBC Access, and DLC JDBC Access.
jar_file
The storage path of the driver JAR package. Replace it with the absolute path where the corresponding engine's JDBC driver JAR package is stored. For more details, see Hive JDBC Access, Presto JDBC Access, and DLC JDBC Access.

Running Scheduled Tasks

You can access the Web interface, navigate to the DAGs tab, locate the submitted scheduling workflow, and start the scheduling.


Viewing Task Execution Results



Help and Support

Was this page helpful?

Help us improve! Rate your documentation experience in 5 mins.

Feedback