tencent cloud

Tencent Cloud WeData

DLC PySpark

PDF
Modo Foco
Tamanho da Fonte
Última atualização: 2024-11-01 16:26:14
Note:
You need to bind the DLC engine. Currently, DLC PySpark supports the Spark job engine. For engine kernel details, see DLC Engine Kernel Version.

Feature Overview

Create a DLC PySpark task in WeData, submit it to the WeData scheduling platform and the DLC engine for execution.

Task parameters description

In the task properties of DLC PySpark, you can add DLC PySpark task data access policy, entry parameters, dependent resources, Spark task conf parameters, and task image.
Parameter name
Parameter description
Data access policy
Required, security policy to access COS data during task execution. For details, refer to DLC Configuration Data Access Policy.
Entry parameters
Optional, entry parameters of the program. Multiple parameters are supported and should be separated by "space".
Dependent resources
Optional, supports selecting --py-files, --files, --archives. Multiple COS paths for each resource can be input, separated by commas (,).
Conf parameters
Optional, parameters starting with spark., formatted as k=v. Multiple parameters should be separated by new lines. Example: spark.network.timeout=120s.
Task image
The image for task execution. If the task requires a specific image, you can choose between DLC built-in image and custom image.
Resource configuration
Using cluster resource configuration: Use the default resource configuration parameters of the cluster.
Custom: Resource usage parameters for custom tasks, including executor size, driver size, and number of executors.

Sample code

from os.path import abspath

from pyspark.sql import SparkSession

if __name__ == "__main__":
spark = SparkSession \\
.builder \\
.appName("Operate DB Example") \\
.getOrCreate()
# 1. Create database
spark.sql("CREATE DATABASE IF NOT EXISTS `DataLakeCatalog`.`dlc_db_test_py` COMMENT 'demo test' ")
# 2. Create inner table
spark.sql("CREATE TABLE IF NOT EXISTS `DataLakeCatalog`.`dlc_db_test_py`.`test`(`id` int,`name` string,`age` int) ")
# 3. Write inner data
spark.sql("INSERT INTO `DataLakeCatalog`.`dlc_db_test_py`.`test` VALUES (1,'Andy',12),(2,'Justin',3) ")
# 4. Query inner data
spark.sql("SELECT * FROM `DataLakeCatalog`.`dlc_db_test_py`.`test` ").show()
# 5. Create outer table
spark.sql("CREATE EXTERNAL TABLE IF NOT EXISTS `DataLakeCatalog`.`dlc_db_test_py`.`ext_test`(`id` int, `name` string, `age` int) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE LOCATION 'cosn://cos-bucket-name/ext_test' ")
# 6. Write outer data
spark.sql("INSERT INTO `DataLakeCatalog`.`dlc_db_test_py`.`ext_test` VALUES (1,'Andy',12),(2,'Justin',3) ")
# 7. Query outer data
spark.sql("SELECT * FROM `DataLakeCatalog`.`dlc_db_test_py`.`ext_test` ").show()
spark.stop()


Ajuda e Suporte

Esta página foi útil?

comentários