プロダクト概要
製品の強み
適用シーン
import timefrom datetime import datetime, timedeltaimport jaydebeapifrom airflow import DAGfrom airflow.operators.python_operator import PythonOperatorjdbc_url='jdbc:dlc:dlc.tencentcloudapi.com?task_type=SparkSQLTask&database_name={dataBaseName}&datasource_connection_name={dataSourceName}®ion={region}&data_engine_name={engineName}'user = 'xxx'pwd = 'xxx'dirver = 'com.tencent.cloud.dlc.jdbc.DlcDriver'jar_file = '/root/airflow/jars/dlc-jdbc-2.5.3-jar-with-dependencies.jar'def createTable():sqlStr = 'create table if not exists db.tb1 (c1 int, c2 string)'conn = jaydebeapi.connect(dirver, jdbc_url, [user, pwd], jar_file)curs = conn.cursor()curs.execute(sqlStr)rows = curs.rowcount.realif rows != 0:result = curs.fetchall()print(result)curs.close()conn.close()def insertValues():sqlStr = "insert into db.tb1 values (111, 'this is test')"conn = jaydebeapi.connect(dirver,jdbc_url, [user, pwd], jar_file)curs = conn.cursor()curs.execute(sqlStr)rows = curs.rowcount.realif rows != 0:result = curs.fetchall()print(result)curs.close()conn.close()def selectColums():sqlStr = 'select * from db.tb1'conn = jaydebeapi.connect(dirver, jdbc_url, [user, pwd], jar_file)curs = conn.cursor()curs.execute(sqlStr)rows = curs.rowcount.realif rows != 0:result = curs.fetchall()print(result)curs.close()conn.close()def get_time():print('現在の時間は:', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))return time.time()default_args = {'owner': 'tencent', # 所有者名'start_date': datetime(2024, 11, 1), # 初回実行開始時間、UTC時間'retries': 2, # 失敗時の再試行回数'retry_delay': timedelta(minutes=1), # 失敗時の再試行間隔}dag = DAG(dag_id='airflow_dlc_test', # DAG ID、完全に英数字とアンダースコアで構成する必要がありますdefault_args=default_args, # 外部で定義されたdic形式のパラメータschedule_interval=timedelta(minutes=1), # DAGの実行頻度を定義、日、週、時間、分、秒、ミリ秒で設定可能catchup=False # DAG実行時、開始時刻から現在までのすべてのタスクを実行するかどうか、デフォルトはTrue)t1 = PythonOperator(task_id='create_table',python_callable=createTable,dag=dag)t2 = PythonOperator(task_id='insert_values',python_callable=insertValues,dag=dag)t3 = PythonOperator(task_id='select_values',python_callable=selectColums,dag=dag)t4 = PythonOperator(task_id='print_time',python_callable=get_time,dag=dag)t1 >> t2 >> [t3, t4]
パラメータ | 説明 |
jdbc_url | |
user | SecretId |
pwd | SecretKey |
dirver | |
jar_file | ドライバ jar パッケージの保存パス。対応するエンジンの JDBC ドライバ jar パッケージの絶対パスを置き換える必要があります。詳細については、Hive JDBC アクセス、Presto JDBC アクセス、および DLC JDBC アクセスを参照してください |


フィードバック