执行Spark引擎业务
Spark使用交互式页面命令行来执行SQL任务,需要注意的是Spark侧看SparkExtension是否生效需要在sql语句前加explain语句,或者在Spark UI页面查看,如果算子是以Omni开头的则代表SparkExtension生效。
本次任务示例使用tpcds_bin_partitioned_orc_3的数据表作为测试表,测试SQL为TPC-DS测试集的Q82。
相关的表信息如表1所示。
表名 |
表格式 |
总行数 |
---|---|---|
item |
orc |
36000 |
inventory |
orc |
28188000 |
date_dim |
orc |
73049 |
store_sales |
orc |
8639377 |
- 启动Spark-SQL命令行窗口。
原生Spark-SQL启动命令如下。
/usr/local/spark/bin/spark-sql --deploy-mode client --driver-cores 5 --driver-memory 5g --num-executors 18 --executor-cores 21 --executor-memory 10g --master yarn --conf spark.executor.memoryOverhead=5g --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=45g --conf spark.task.cpus=1 --database tpcds_bin_partitioned_orc_3
SparkExtension插件启动命令如下。/usr/local/spark/bin/spark-sql --deploy-mode client --driver-cores 5 --driver-memory 5g --num-executors 18 --executor-cores 21 --executor-memory 10g --master yarn --conf spark.executor.memoryOverhead=5g --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=45g --conf spark.task.cpus=1 --conf spark.driver.extraClassPath=/opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.0.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-1.0.0-aarch64.jar:/opt/omni-operator/lib/dependencies/* --conf spark.executor.extraClassPath=/opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.0.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-1.0.0-aarch64.jar:/opt/omni-operator/lib/dependencies/* --driver-java-options '-Djava.library.path=/opt/omni-operator/lib' --conf spark.sql.codegen.wholeStage=false --conf spark.executorEnv.LD_LIBRARY_PATH="/opt/omni-operator/lib/" --conf spark.driverEnv.LD_LIBRARY_PATH="/opt/omni-operator/lib/" --conf spark.executor.extraLibraryPath=/opt/omni-operator/lib --conf spark.driverEnv.LD_PRELOAD=/opt/omni-operator/lib/libjemalloc.so.2 --conf spark.executorEnv.LD_PRELOAD=/opt/omni-operator/lib/libjemalloc.so.2 --conf spark.sql.extensions="com.huawei.boostkit.spark.ColumnarPlugin" --jars /opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.0.0-aarch64.jar --jars /opt/omni-operator/lib/boostkit-omniop-bindings-1.0.0-aarch64.jar --conf spark.sql.orc.impl=native --conf spark.shuffle.manager="org.apache.spark.shuffle.sort.ColumnarShuffleManager" --conf spark.sql.join.columnar.preferShuffledHashJoin=true --database tpcds_bin_partitioned_orc_3
SparkExtension相关的启动参数信息如表2所示。
表2 SparkExtension相关启动参数信息 启动参数名称
默认值
含义
spark.sql.extensions
com.huawei.boostkit.spark.ColumnarPlugin
启用SparkExtension。
spark.shuffle.manager
sort
是否启用列式shuffle,若启用则需添加配置项--conf spark.shuffle.manager="org.apache.spark.shuffle.sort.ColumnarShuffleManager"。默认sort走原生的shuffle。
spark.omni.sql.columnar.hashagg
true
是否启用列式HashAgg,true表示启用,false表示关闭。
spark.omni.sql.columnar.project
true
是否启用列式Project,true表示启用,false表示关闭。
spark.omni.sql.columnar.projfilter
true
是否启用列式ConditionProject(Project + Filter融合算子),true表示启用,false表示关闭。
spark.omni.sql.columnar.filter
true
是否启用列式Filter,true表示启用,false表示关闭。
spark.omni.sql.columnar.sort
true
是否启用列式Sort,true表示启用,false表示关闭。
spark.omni.sql.columnar.window
true
是否启用列式Window,true表示启用,false表示关闭。
spark.omni.sql.columnar.broadcastJoin
true
是否启用列式BroadcastHashJoin,true表示启用,false表示关闭。
spark.omni.sql.columnar.nativefilescan
true
是否启用列式NativeFilescan,true表示启用,false表示关闭。
spark.omni.sql.columnar.sortMergeJoin
true
是否启用列式SortMergeJoin,true表示启用,false表示关闭。
spark.omni.sql.columnar.takeOrderedAndProject
true
是否启用列式TakeOrderedAndProject,true表示启用,false表示关闭。
spark.omni.sql.columnar.shuffledHashJoin
true
是否启用列式ShuffledHashJoin,true表示启用,false表示关闭。
spark.shuffle.columnar.shuffleSpillBatchRowNum
10000
shuffle输出的每个batch中包含数据的行数。
spark.shuffle.columnar.shuffleSpillMemoryThreshold
2147483648
shuffle内存溢写上限,shuffle内存上限达到默认值时会发生溢写,单位是Byte。
spark.shuffle.columnar.compressBlockSize
65536
Shuffle数据压缩块大小,单位是Byte。
spark.sql.execution.columnar.maxRecordsPerBatch
4096
列式shuffle初始化buffer大小,单位是Byte。
spark.sql.join.columnar.preferShuffledHashJoin
false
Join发生时,是否优先使用ShuffledHashJoin,true表示启用,false表示关闭。
spark.omni.sql.columnar.jit
false
是否启用列式Jit,true表示启用,false表示关闭。启动Jit需要配置OMNI_HOME环境变量。在on yarn上需要添加--conf spark.executorEnv.OMNI_HOME="/opt/omni-operator" --conf spark.driverEnv.OMNI_HOME="/opt/omni-operator" 配置项。
spark.shuffle.compress
true
shuffle是否开启压缩。true表示压缩,false表示不压缩。
spark.io.compression.codec
lz4
shuffle压缩格式。支持uncompressed、zlib、snappy、lz4和zstd格式。
spark.omni.sql.columnar.sortSpill.rowThreshold
200000
sort算子溢写触发条件,处理数据行超过此值触发溢写,单位为Byte。
spark.omni.sql.columnar.sortSpill.dirDiskReserveSize
10737418240
sort溢写磁盘预留可用空间大小,如果实际小于此值会抛异常,单位为Byte。
spark.omni.sql.columnar.sortSpill.enabled
false
sort算子是否开启溢写能力。true表示开启溢写能力,false表示关闭。
- 查看SparkExtension是否生效。
在SparkExtension和原生Spark-SQL交互式命令行窗口分别运行以下SQL语句。
explain select i_item_id ,i_item_desc ,i_current_price from item, inventory, date_dim, store_sales where i_current_price between 76 and 76+30 and inv_item_sk = i_item_sk and d_date_sk=inv_date_sk and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date) and i_manufact_id in (512,409,677,16) and inv_quantity_on_hand between 100 and 500 and ss_item_sk = i_item_sk group by i_item_id,i_item_desc,i_current_price order by i_item_id limit 100;
SparkExtension输出执行计划如下图,如果算子以Omni开头则证明SparkExtension生效。
原生Spark-SQL输出执行计划如下图。
- 运行SQL语句。
在SparkExtension和原生Spark-SQL交互式命令行窗口分别运行以下SQL语句。
select i_item_id ,i_item_desc ,i_current_price from item, inventory, date_dim, store_sales where i_current_price between 76 and 76+30 and inv_item_sk = i_item_sk and d_date_sk=inv_date_sk and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date) and i_manufact_id in (512,409,677,16) and inv_quantity_on_hand between 100 and 500 and ss_item_sk = i_item_sk group by i_item_id,i_item_desc,i_current_price order by i_item_id limit 100;
- 输出结果对比。
- 原生Spark-SQL执行结果。
执行计划如下。
- SparkExtension插件执行结果。
执行计划如下。