执行Spark引擎业务

Spark使用交互式页面命令行来执行SQL任务,需要注意的是Spark侧看SparkExtension是否生效需要在SQL语句前加EXPLAIN语句,或者在Spark UI页面查看,如果算子是以Omni开头的则代表SparkExtension生效。

本次任务示例使用tpcds_bin_partitioned_orc_3的数据表作为测试表,测试SQL为TPC-DS测试集的Q82。

相关的表信息如表1所示。

表1 相关表信息

表名

表格式

总行数

item

orc

36000

inventory

orc

28188000

date_dim

orc

73049

store_sales

orc

8639377

  1. 启动Spark-SQL命令行窗口。

    原生Spark-SQL启动命令如下。

    /usr/local/spark/bin/spark-sql --deploy-mode client --driver-cores 5 --driver-memory 5g --num-executors 18 --executor-cores 21 --executor-memory 10g --master yarn  --conf spark.executor.memoryOverhead=5g --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=45g --conf spark.task.cpus=1 --database tpcds_bin_partitioned_orc_3
    SparkExtension插件启动命令如下。
    /usr/local/spark/bin/spark-sql --deploy-mode client --driver-cores 5 --driver-memory 5g --num-executors 3 --executor-cores 21 --executor-memory 5g --master yarn  --conf spark.executor.memoryOverhead=5g --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=5g --conf spark.task.cpus=1 --conf spark.driver.extraClassPath=/opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.1.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-1.1.0-aarch64.jar:/opt/omni-operator/lib/dependencies/* --conf spark.executor.extraClassPath=/opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.1.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-1.1.0-aarch64.jar:/opt/omni-operator/lib/dependencies/* --driver-java-options '-Djava.library.path=/opt/omni-operator/lib' --conf spark.sql.codegen.wholeStage=false --conf  spark.executorEnv.LD_LIBRARY_PATH="/opt/omni-operator/lib/:/usr/local/lib/HMPP" --conf  spark.executorEnv.OMNI_HOME="/opt/omni-operator" --conf spark.driverEnv.LD_LIBRARY_PATH="/opt/omni-operator/lib/:/usr/local/lib/HMPP" --conf spark.driverEnv.OMNI_HOME="/opt/omni-operator" --conf spark.executor.extraLibraryPath=/opt/omni-operator/lib --conf spark.driverEnv.LD_PRELOAD=/opt/omni-operator/lib/libjemalloc.so.2 --conf spark.executorEnv.LD_PRELOAD=/opt/omni-operator/lib/libjemalloc.so.2 --conf spark.sql.extensions="com.huawei.boostkit.spark.ColumnarPlugin" --jars /opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.1.0-aarch64.jar --jars /opt/omni-operator/lib/boostkit-omniop-bindings-1.1.0-aarch64.jar --conf spark.sql.orc.impl=native --conf spark.shuffle.manager="org.apache.spark.shuffle.sort.OmniColumnarShuffleManager" --conf spark.sql.join.columnar.preferShuffledHashJoin=true --conf spark.executorEnv.OMNI_CONNECTED_ENGINE=Spark --database tpcds_bin_partitioned_orc_3

    SparkExtension相关的启动参数信息如表2所示。

    表2 SparkExtension相关启动参数信息

    启动参数名称

    默认值

    含义

    spark.sql.extensions

    com.huawei.boostkit.spark.ColumnarPlugin

    启用SparkExtension。

    spark.shuffle.manager

    sort

    是否启用列式shuffle,若启用则需添加配置项--conf spark.shuffle.manager="org.apache.spark.shuffle.sort.OmniColumnarShuffleManager"。ock启用请配置ock自有的shuffleManager类。默认sort走原生的shuffle。

    spark.omni.sql.columnar.hashagg

    true

    是否启用列式HashAgg,true表示启用,false表示关闭。

    spark.omni.sql.columnar.project

    true

    是否启用列式Project,true表示启用,false表示关闭。

    spark.omni.sql.columnar.projfilter

    true

    是否启用列式ConditionProject(Project + Filter融合算子),true表示启用,false表示关闭。

    spark.omni.sql.columnar.filter

    true

    是否启用列式Filter,true表示启用,false表示关闭。

    spark.omni.sql.columnar.sort

    true

    是否启用列式Sort,true表示启用,false表示关闭。

    spark.omni.sql.columnar.window

    true

    是否启用列式Window,true表示启用,false表示关闭。

    spark.omni.sql.columnar.broadcastJoin

    true

    是否启用列式BroadcastHashJoin,true表示启用,false表示关闭。

    spark.omni.sql.columnar.nativefilescan

    true

    是否启用列式NativeFilescan,true表示启用,false表示关闭。

    spark.omni.sql.columnar.sortMergeJoin

    true

    是否启用列式SortMergeJoin,true表示启用,false表示关闭。

    spark.omni.sql.columnar.takeOrderedAndProject

    true

    是否启用列式TakeOrderedAndProject,true表示启用,false表示关闭。

    spark.omni.sql.columnar.shuffledHashJoin

    true

    是否启用列式ShuffledHashJoin,true表示启用,false表示关闭。

    spark.shuffle.columnar.shuffleSpillBatchRowNum

    10000

    Shuffle输出的每个batch中包含数据的行数。请根据实际环境的内存调整参数,可以适当增大此参数,从而减少写入磁盘文件的批次,提升写入速度。

    spark.shuffle.columnar.shuffleSpillMemoryThreshold

    2147483648

    Shuffle内存溢写上限,Shuffle内存上限达到默认值时会发生溢写,单位是Byte。请根据实际环境的内存调整参数,可以适当增大此参数,从而减少shuffle内存溢写到磁盘文件次数,减少磁盘IO操作。

    spark.shuffle.columnar.compressBlockSize

    65536

    Shuffle数据压缩块大小,单位是Byte。请根据实际环境的内存调整参数,建议采用默认值。

    spark.sql.execution.columnar.maxRecordsPerBatch

    4096

    列式Shuffle初始化Buffer大小,单位是Byte。请根据实际环境的内存调整参数,可以适当增大此参数,从而减少Shuffle读写次数,提升性能。

    spark.sql.join.columnar.preferShuffledHashJoin

    false

    Join发生时,是否优先使用ShuffledHashJoin,true表示启用,false表示关闭。

    spark.shuffle.compress

    true

    Shuffle是否开启压缩。true表示压缩,false表示不压缩。

    spark.io.compression.codec

    lz4

    Shuffle压缩格式。支持uncompressed、zlib、snappy、lz4和zstd格式。

    spark.omni.sql.columnar.sortSpill.rowThreshold

    200000

    sort算子溢写触发条件,处理数据行超过此值触发溢写,单位为Byte。请根据实际环境的内存调整参数,可以适当增大此参数,从而减少sort算子溢写到磁盘文件的次数,减少磁盘IO操作。

    spark.omni.sql.columnar.sortSpill.dirDiskReserveSize

    10737418240

    sort溢写磁盘预留可用空间大小,如果实际小于此值会抛异常,单位为Byte。根据实际环境的磁盘容量和业务场景调整参数,建议不超过业务数据大小,取值上限为实际环境的磁盘容量大小。

    spark.omni.sql.columnar.sortSpill.enabled

    false

    sort算子是否开启溢写能力。true表示开启溢写能力,false表示关闭。

  2. 查看SparkExtension是否生效。

    在SparkExtension和原生Spark-SQL交互式命令行窗口分别运行以下SQL语句。

    explain select  i_item_id
        ,i_item_desc
        ,i_current_price
    from item, inventory, date_dim, store_sales
    where i_current_price between 76 and 76+30
    and inv_item_sk = i_item_sk
    and d_date_sk=inv_date_sk
    and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date)
    and i_manufact_id in (512,409,677,16)
    and inv_quantity_on_hand between 100 and 500
    and ss_item_sk = i_item_sk
    group by i_item_id,i_item_desc,i_current_price
    order by i_item_id
    limit 100;

    SparkExtension输出执行计划如下图,如果算子以Omni开头则证明SparkExtension生效。

    原生Spark-SQL输出执行计划如下图。

  3. 运行SQL语句。

    在SparkExtension和原生Spark-SQL交互式命令行窗口分别运行以下SQL语句。

    select  i_item_id
        ,i_item_desc
        ,i_current_price
    from item, inventory, date_dim, store_sales
    where i_current_price between 76 and 76+30
    and inv_item_sk = i_item_sk
    and d_date_sk=inv_date_sk
    and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date)
    and i_manufact_id in (512,409,677,16)
    and inv_quantity_on_hand between 100 and 500
    and ss_item_sk = i_item_sk
    group by i_item_id,i_item_desc,i_current_price
    order by i_item_id
    limit 100;

  4. 输出结果对比。

    • 原生Spark-SQL执行结果。

      执行计划如下。

    • SparkExtension插件执行结果。

      执行计划如下。

    • 结果对比。

      原生和SparkExtension的SQL输出数据一致。