目录

关于Spark中读取,计算和写入造成结果异常的场景分析

前言

Spark是目前主流的分布式计算框架之一,本文谈在使用Spark进行计算时与结果正确性相关的一些issue的场景以及分析。

计算可以分为三个过程,数据读取,数据计算,数据写入,本文就从这三个部分来阐述可能遇到的问题以及规避方案(如有错误,请指正)。

数据读取

首先,spark在生产中最常用的使用场景就是spark-sql。在spark-sql中,使用 hive 的metastore进行元数据存储,因此在使用中,往往是spark DataSource表和hive表共存。

表类型

表的类型,可以分为以下几种。

  • hive(managed/external)表: 如果不指定location,是一个managed表,直接存储在数据库对应的目录下,如果进行drop 操作,会将对应的数据删掉。如果对表指定location,是一个External表,数据存在指定的路径下,如果进行drop操作,不会删除对应的数据,这样相对来说会更安全一些,减小一些误操作造成数据丢失的风险。
  • spark DataSource(managed/external)表: 使用spark DataSource创建的表

parquet是一种列式存储格式,在数据库场景中可以在查询时过滤掉不必要的数据,适用于读多写少的场景。

Spark选择了parquet作为常用的存储格式,因此在生产中,最常见的表就是parquet表。

parquet是一种存储格式,既然是存储,就会有写入和读取的过程,也就是序列化和反序列化。

在Spark-sql场景中,有两种parquet 版本,一种是spark内置的parquet,一种是hive内置的parquet版本,往往hive中的parquet版本较老,而spark中的parquet较新,其序列化和反序列化性能更好,但是可能会出现一些不兼容的情况。

下面谈一下创建spark DataSource表和hive表的方式。

Spark Datasource 表

  • 使用Spark DataFrame API进行创建
  • 使用using parquet的方式创建
    • 例如: create table ta (id int, name string) using parquet

Hive 表

  • 使用stored as parquet进行创建
    • 例如: create table ta(id int, name string) stored as parquet

如果想判断一张已经建的表是hive表还是spark DataSource表可以使用 show create table命令查看信息。

  • hive表 可以看到其INPUTFORMAT/OUTPUTFORMAT是 hive开头的。

    CREATE TABLE `ta`(`id` int, `name` string)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    WITH SERDEPROPERTIES (
      'serialization.format' = '1'
    )
    STORED AS
      INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
      OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    TBLPROPERTIES (
      'transient_lastDdlTime' = '1569816513'
    )
    
  • Spark DataSource表

    CREATE TABLE `ta` (`id` INT, `name` STRING)
    USING parquet
    OPTIONS (
      `serialization.format` '1'
    )
    

兼容性问题与规避方案

前面通过show create table命令也看到了,hive表就是使用hive 的parquet序列化方式。而spark由于自带的parquet性能更加卓越,所以在spark中有一个参数,spark.sql.hive.convertMetastoreParquet, 其默认是true,表示将使用spark内置的parquet的序列化和反序列化去读取使用hive语法创建的hive表,而非使用hive内置的parquet序列化和反序列化。

所以,有时候就会出现使用spark读取hive表时数据全为null的情况(spark中遇到数据解释不了,或者overflow,默认就是返回null)。

这时候,可以将spark.sql.hive.convertMetastoreParquet设为false,来解决这个问题。

数据计算

关于数据计算,分析一个关于Decimal 计算异常的问题。

关于Decimal 和Decimal计算精度参数

介绍一下Decimal类型。

Decimal是数据库中的一种数据类型,不属于浮点数类型,可以在定义时划定整数部分以及小数部分的位数。对于一个Decimal类型,scale表示其小数部分的位数,precision表示整数部分位数和小数部分位数之和。

一个Decimal 类型表示为Decimal(precision, scale),在Spark中,precision和scale的上限都是38。

对于一个double类型,其可以精确的表示小数点后15位,有效位数位16位。而Decimal类型相对于double类型可以更加精确的表示保证数据计算,例如对于一个Decimal(38, 24)类型,其可以精确的表示小数点后23位。

下面介绍spark.sql.decimalOperations.allowPrecisionLoss参数。

当该参数为true(默认),表示允许丢失精度,会根据Hive行为和SQL ANSI 2011规范来决定result的类型,即如果无法精确的表示,则舍入结果的小数部分。

当该参数为false时,代表不允许丢失精度,这样会将数据表示的更加精确。

场景分析

介绍一下这个场景。下面的语句:

set spark.sql.decimalOperations.allowPrecisionLoss=false;
select case when 1=2 then 1 else 1.123456789012345678901234 end * 1; 
//结果为空

set spark.sql.decimalOperations.allowPrecisionLoss=true;
select case when 1=2 then 1 else 1.123456789012345678901234 end * 1; 
//结果是 1.12345678901234568,丢失了部分精度,因为允许丢失精度。

我们将上面语句的执行计划打印出来。

"== Physical Plan ==
*(1) Project [null AS (CASE WHEN (1 = 2) THEN CAST(1 AS DECIMAL(34,24)) ELSE CAST(1.123456789012345678901234 AS DECIMAL(34,24)) END * CAST(1 AS DECIMAL(34,24)))#170]
+- Scan OneRowRelation[]"

执行计划很简单,里面有一个二元操作(乘法),左边的case when 是一个Decimal(34, 24)类型,右边是一个Literal(1)。

程序员都知道,在编程中,如果两个不同类型的操作数做计算,会将低级别的类型向高级别的类型进行类型转换,Spark中也是如此。

一条SQL语句进入Spark-sql引擎之后,要经历Analysis->optimization->生成可执行物理计划的过程,而这个过程就是不同的Rule作用在Plan上面不断作用,然后Plan随之转化的过程。

在Spark sql中有一系列关于类型转换的Rule,这些Rule作用在Analysis阶段的Resolution子阶段。

我们来看一下其中一条Rule, ImplicitTypeCasts 中和BinaryOperator相关的代码。

// sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
case b @ BinaryOperator(left, right) if left.dataType != right.dataType => 
  findTightestCommonType(left.dataType, right.dataType).map { commonType => 
    if (b.inputType.acceptsType(commonType)) { 
      // If the expression accepts the tightest common type, cast to that. 
      val newLeft = if (left.dataType == commonType) left else Cast(left, commonType) 
      val newRight = if (right.dataType == commonType) right else Cast(right, commonType) 
      b.withNewChildren(Seq(newLeft, newRight)) 
    } else { 
      // Otherwise, don't do anything with the expression. 
      b 
    } 
 }.getOrElse(b)  // If there is no applicable conversion, leave expression unchanged. 

解释一下上面的代码,针对一个BinaryOperator(例如 + - * /), 如果左边的数据类型和右边不一致,那么会寻找一个左右操作数的common type, 然后将左右操作数都转换为common type。针对我们此处case中的 Decimal(34, 24) 和Literal(1), 它们的common type就是Decimal(34, 24),所以这里的Literal(1)将被转换为Decimal(34, 24)。

这样该二元操作的两边就都是Decimal类型。接下来这个二元操作会被Rule DecimalPrecision中的decimalAndDecimal方法处理。由于该二元操作是乘法操作,我们看乘法操作部分的代码。

// sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala
case Multiply(e1 @ DecimalType.Expression(p1, s1), e2 @ DecimalType.Expression(p2, s2)) =>
  val resultType = if (SQLConf.get.decimalOperationsAllowPrecisionLoss) {
    DecimalType.adjustPrecisionScale(p1 + p2 + 1, s1 + s2)
  } else {
    DecimalType.bounded(p1 + p2 + 1, s1 + s2)
  }
  val widerType = widerDecimalType(p1, s1, p2, s2)
  CheckOverflow(Multiply(promotePrecision(e1, widerType), promotePrecision(e2, widerType)), resultType)

此处我们的操作数已经都是Decimal(34, 24)类型,所以p1=p2=34, s1=s2=24.

如果不允许精度丢失,那么resultType就是 DecimalType.bounded(p1+p2+1, s1+s2), bounded方法代表precision 和 scale都不能超过38,所以这里的ResultType就是Decimal(38, 38), 也就是小数部分为38位,那么整数部分就只剩下0位来表示,也就是说如果整数部分非0,那么这个结果就会overflow。在当前版本中,如果发生Decimal Operation 计算发生了overflow,那么就会返回一个Null的结果。

解决方案

解决此问题,可以合入PRSPARK-29000来解决在非Decimal和Decimal之间操作数转化时,精度转换不当的问题,合入 SPARK-23179 来在Decimal计算overflow时抛出一个异常来提醒用户计算出现问题,让用户感知。

有兴趣的话,可以查看具体的分析和解决方案描述Spark Sql Decimal Precision Overflow Analysis

数据写入

分析一下在数据写入时候会发生的异常。

场景A

前面提到了外部表,可以在进行drop操作的时候不删数据。但这可能也会造成一个问题。

对于一个外部分区表。

如果我们先drop掉这张表的一个分区,然后再overwrite这个分区,可能会造成数据重复。

下面是一个可复现的场景。

// 创建外部分区表
create external table test(id int) partitioned by (name string) stored as parquet location 'file://path';
// overwrite一个分区n1, 也因此创建了这个分区
insert overwrite table test partition(name='n1') select 1;
// drop n1这个分区
ALTER TABLE test DROP PARTITION(name='n1');
// overwrite n1这个分区
insert overwrite table test partition(name='n1') select 2;

通过测试,发现在spark-2.3版本,进行上述操作,最后select这张表,得到的结果如下:

Id Name
1 n1
2 n1

这个结果是异常的,正确的结果应该只有一条 2, n1.

P.S. 在master分支,对hive table做了一些优化,如果将spark.sql.hive.convertMetastoreParquet设为true(此时会将InsertIntoHiveTable的操作转换为使用Spark DataSource的写入)是可以得到正确的结果,但是如果将spark.sql.hive.convertMetastoreParquet设为false,依然会得到上述异常数据。

其实这是Hive的一个bug,相关patch为 HIVE-17063

暂时规避方案,就是在对外部表做overwrite partition操作前,先不要进行drop partition操作(已提jira,希望可后续解决)。

场景B

此场景发生在使用Spark file source方式对表进行写入(InsertIntoHadoopFsRelation)操作时候。

FileOutputCommitter

Spark对HDFS的写入实现,依赖于Hadoop 的FileOutputCommitter。

简单介绍一下FileOutputCommitter。首先其有一个outputPath和一个committer 算法版本,1或者2。

其会有一个$tablePath/_temporary/number(对于Spark来说是$tablePath/_temporary/0)作为一个working 目录(存放中间数据),task在未完成之前的数据在这个working目录中进行。

如果committer算法版本为1,task完成之后会首先commit到一个 task_attempt_output目录(在_temporary/0下面),在所有task完成之后会将所有task_attempt_output 下面的数据commit到outputPath中,这是一个二阶段提交。

而如果committer算法版本为2,那么task完成之后会直接commit到最终目录里,这是一个非二阶段提交,所以会产生应用失败,但是部分数据写入的问题,但是由于其是一次写入到最终目录,所以性能较版本1要好。

目前Spark默认的commit算法版本是1.

表写入

表按照是否有分区来划分,可以分为分区表和非分区表。

针对非分区表,spark在进行写入时候的working目录都是 $tablePath/_temporary/0.

针对分区表,spark在写入时候会判断这个写入操作是否是dynamicPartitionOverwrite。如果是,则其working目录是$tablePath/.spark-staging-${UUID},也就是不会重复的,每次都独一无二的。如果不是,则其working目录还是$tablePath/_temporary/0

介绍一下dynamicPartitionOverwrite。对分区表分区进行overwrite分为static 和dynamic两种类型。

在Spark中相关参数为spark.sql.sources.partitionOverwriteMode(hive中也有对应的参数),默认为static,代表不允许进行dynamicPartitionOverwrite,如果设为dynamic代表允许dynamicPartitionOverwrite。

dynamic代表你不必指定所有partition key的值,由spark来根据结果,确定你要overwrite哪些partition,因此其在数据计算完成之前,不会去删表中的分区。

而static代表,你必须指定你要overwrite哪些分区,所以需要被overwrite的分区是可确定的,因此会在操作开始的时候就把对应的分区删除掉。

比如下面的这个查询:

insert overwrite table tablea partition(p1=v1,p2=v2,p3) select ...

如果是static的overwrite,其会首先删掉 表下面的 p1=v1/p2=v2分区(会删掉下面所有的p3子分区),然后在数据计算完之后,将数据写入。

如果是dynamic的overwrite,其不会首先删掉表下面的 p1=v1/p2=v2分区,而是会根据计算的结果去判断,我应该删掉哪些子分区。

比如说在进行overwrite之前, p1=v1/p2=v2下面有 p3=v31p3=v32两个子分区,而实际select ...语句产生的结果只有p3=v31的结果。

如果是static overwrite,会先把p3=v31p3=v32两个子分区都删掉,而dynamic overwrite,只会在最后根据计算结果只overwrite p3=v31这个分区。

关于表的写入背景知识介绍到这里,下面介绍具体场景。

应用被kill掉,working数据未被清理

如果应用appA 是对tableA一个static partition overwrite,其由于某个task hang住,然后被kill掉,所以其working目录($tableA/_temporary/0)没有被清理掉.

而我们在其被kill掉之后,又重新跑这个应用,新的应用继续使用$tableA/_temporary/0作为working目录,之后新应用运行成功,但是提交时候将上次遗留的一些task的数据提交到最终目录,造成数据重复。

针对这个场景,暂时解决方案,我们需要在应用被kill之后,手动清理_temporary/0.

两个应用并发写入一个表

如果在kill应用的时候,resourceManager发生了异常,造成了我们以为应用已经被kill掉,然后我们又重新提交了一样的应用去写数据。两个应用会共用_temporary/0,互相干扰,可能造成结果异常。

针对这个场景,我们可能不容易察觉。

其实即使是dynamic partition overwrite 会用独一无二的working目录,其在多个操作并发写入同一张表时,仍然可能会发生干扰冲突。

关于场景B中问题的长期解决方案,PR SPARK-29037 正在致力于解决这个问题,希望可以解决数据重复以及在多个操作并发写入一张表可能造成干扰的问题。