中文
注册
我要评分
文档获取效率
文档正确性
内容完整性
文档易理解
在线提单
论坛求助

执行Spark引擎业务

Spark使用交互式页面命令行来执行SQL任务,需要注意的是Spark侧看SparkExtension是否生效需要在sql语句前加explain语句,或者在Spark UI页面查看,如果算子是以Omni开头的则代表SparkExtension生效。

本次任务示例使用tpcds_bin_partitioned_orc_3的数据表作为测试表,测试SQL为TPC-DS测试集的Q82。

相关的表信息如表1所示。

表1 相关表信息

表名

表格式

总行数

item

orc

36000

inventory

orc

28188000

date_dim

orc

73049

store_sales

orc

8639377

  1. 启动Spark-SQL命令行窗口。

    原生Spark-SQL启动命令如下。

    /usr/local/spark/bin/spark-sql --deploy-mode client --driver-cores 5 --driver-memory 5g --num-executors 18 --executor-cores 21 --executor-memory 10g --master yarn  --conf spark.executor.memoryOverhead=5g --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=45g --conf spark.task.cpus=1 --database tpcds_bin_partitioned_orc_3
    SparkExtension插件启动命令如下。
    /usr/local/spark/bin/spark-sql --deploy-mode client --driver-cores 5 --driver-memory 5g --num-executors 18 --executor-cores 21 --executor-memory 10g --master yarn  --conf spark.executor.memoryOverhead=5g --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=45g --conf spark.task.cpus=1 --conf spark.driver.extraClassPath=/opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.0.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-1.0.0-aarch64.jar:/opt/omni-operator/lib/dependencies/* --conf spark.executor.extraClassPath=/opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.0.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-1.0.0-aarch64.jar:/opt/omni-operator/lib/dependencies/* --driver-java-options '-Djava.library.path=/opt/omni-operator/lib' --conf spark.sql.codegen.wholeStage=false --conf  spark.executorEnv.LD_LIBRARY_PATH="/opt/omni-operator/lib/" --conf spark.driverEnv.LD_LIBRARY_PATH="/opt/omni-operator/lib/" --conf spark.executor.extraLibraryPath=/opt/omni-operator/lib --conf spark.driverEnv.LD_PRELOAD=/opt/omni-operator/lib/libjemalloc.so.2 --conf spark.executorEnv.LD_PRELOAD=/opt/omni-operator/lib/libjemalloc.so.2 --conf spark.sql.extensions="com.huawei.boostkit.spark.ColumnarPlugin" --jars /opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.0.0-aarch64.jar --jars /opt/omni-operator/lib/boostkit-omniop-bindings-1.0.0-aarch64.jar  --conf spark.sql.orc.impl=native --conf spark.shuffle.manager="org.apache.spark.shuffle.sort.ColumnarShuffleManager" --conf spark.sql.join.columnar.preferShuffledHashJoin=true --database tpcds_bin_partitioned_orc_3

    SparkExtension相关的启动参数信息如表2所示。

    表2 SparkExtension相关启动参数信息

    启动参数名称

    默认值

    含义

    spark.sql.extensions

    com.huawei.boostkit.spark.ColumnarPlugin

    启用SparkExtension。

    spark.shuffle.manager

    sort

    是否启用列式shuffle,若启用则需添加配置项--conf spark.shuffle.manager="org.apache.spark.shuffle.sort.ColumnarShuffleManager"。默认sort走原生的shuffle。

    spark.omni.sql.columnar.hashagg

    true

    是否启用列式HashAgg,true表示启用,false表示关闭。

    spark.omni.sql.columnar.project

    true

    是否启用列式Project,true表示启用,false表示关闭。

    spark.omni.sql.columnar.projfilter

    true

    是否启用列式ConditionProject(Project + Filter融合算子),true表示启用,false表示关闭。

    spark.omni.sql.columnar.filter

    true

    是否启用列式Filter,true表示启用,false表示关闭。

    spark.omni.sql.columnar.sort

    true

    是否启用列式Sort,true表示启用,false表示关闭。

    spark.omni.sql.columnar.window

    true

    是否启用列式Window,true表示启用,false表示关闭。

    spark.omni.sql.columnar.broadcastJoin

    true

    是否启用列式BroadcastHashJoin,true表示启用,false表示关闭。

    spark.omni.sql.columnar.nativefilescan

    true

    是否启用列式NativeFilescan,true表示启用,false表示关闭。

    spark.omni.sql.columnar.sortMergeJoin

    true

    是否启用列式SortMergeJoin,true表示启用,false表示关闭。

    spark.omni.sql.columnar.takeOrderedAndProject

    true

    是否启用列式TakeOrderedAndProject,true表示启用,false表示关闭。

    spark.omni.sql.columnar.shuffledHashJoin

    true

    是否启用列式ShuffledHashJoin,true表示启用,false表示关闭。

    spark.shuffle.columnar.shuffleSpillBatchRowNum

    10000

    shuffle输出的每个batch中包含数据的行数。

    spark.shuffle.columnar.shuffleSpillMemoryThreshold

    2147483648

    shuffle内存溢写上限,shuffle内存上限达到默认值时会发生溢写,单位是Byte。

    spark.shuffle.columnar.compressBlockSize

    65536

    Shuffle数据压缩块大小,单位是Byte。

    spark.sql.execution.columnar.maxRecordsPerBatch

    4096

    列式shuffle初始化buffer大小,单位是Byte。

    spark.sql.join.columnar.preferShuffledHashJoin

    false

    Join发生时,是否优先使用ShuffledHashJoin,true表示启用,false表示关闭。

    spark.omni.sql.columnar.jit

    false

    是否启用列式Jit,true表示启用,false表示关闭。启动Jit需要配置OMNI_HOME环境变量。在on yarn上需要添加--conf spark.executorEnv.OMNI_HOME="/opt/omni-operator" --conf spark.driverEnv.OMNI_HOME="/opt/omni-operator" 配置项。

    spark.shuffle.compress

    true

    shuffle是否开启压缩。true表示压缩,false表示不压缩。

    spark.io.compression.codec

    lz4

    shuffle压缩格式。支持uncompressed、zlib、snappy、lz4和zstd格式。

    spark.omni.sql.columnar.sortSpill.rowThreshold

    200000

    sort算子溢写触发条件,处理数据行超过此值触发溢写,单位为Byte。

    spark.omni.sql.columnar.sortSpill.dirDiskReserveSize

    10737418240

    sort溢写磁盘预留可用空间大小,如果实际小于此值会抛异常,单位为Byte。

    spark.omni.sql.columnar.sortSpill.enabled

    false

    sort算子是否开启溢写能力。true表示开启溢写能力,false表示关闭。

  2. 查看SparkExtension是否生效。

    在SparkExtension和原生Spark-SQL交互式命令行窗口分别运行以下SQL语句。

    explain select  i_item_id
        ,i_item_desc
        ,i_current_price
    from item, inventory, date_dim, store_sales
    where i_current_price between 76 and 76+30
    and inv_item_sk = i_item_sk
    and d_date_sk=inv_date_sk
    and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date)
    and i_manufact_id in (512,409,677,16)
    and inv_quantity_on_hand between 100 and 500
    and ss_item_sk = i_item_sk
    group by i_item_id,i_item_desc,i_current_price
    order by i_item_id
    limit 100;

    SparkExtension输出执行计划如下图,如果算子以Omni开头则证明SparkExtension生效。

    原生Spark-SQL输出执行计划如下图。

  3. 运行SQL语句。

    在SparkExtension和原生Spark-SQL交互式命令行窗口分别运行以下SQL语句。

    select  i_item_id
        ,i_item_desc
        ,i_current_price
    from item, inventory, date_dim, store_sales
    where i_current_price between 76 and 76+30
    and inv_item_sk = i_item_sk
    and d_date_sk=inv_date_sk
    and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date)
    and i_manufact_id in (512,409,677,16)
    and inv_quantity_on_hand between 100 and 500
    and ss_item_sk = i_item_sk
    group by i_item_id,i_item_desc,i_current_price
    order by i_item_id
    limit 100;
  4. 输出结果对比。
    • 原生Spark-SQL执行结果。

    执行计划如下。

    • SparkExtension插件执行结果。

    执行计划如下。

    • 结果对比。

      原生和SparkExtension的SQL输出数据一致。