数据清洗是指对数据进行审查和校验的过程,目的是删除重复信息、纠正错误数据、处理缺失值和异常值,以及统一数据格式,从而提高数据质量,为后续的数据分析和挖掘工作提供可靠的基础。

Spark 是一个强大的分布式计算框架,在处理大规模数据时,数据清洗是必不可少的步骤。以下是 Spark 数据清洗的具体内容和常用方法:

1.  处理缺失值

Spark 提供了多种处理缺失值的方法,适用于 DataFrame 和 RDD:

  (1) 删除缺失值

      当缺失值占比非常小,且对整体数据影响不大时,可以直接删除包含缺失值的记录。但如果缺        失值较多,直接删除可能会导致数据量大幅减少,丢失大量信息,此时需要谨慎使用该方法。

      使用 dropna() 方法删除包含缺失值的行或列。

  (2) 填充缺失值

      可以使用均值、中位数、众数等统计量来填充数值型变量的缺失值;对于分类变量,可使用最        频繁出现的类别来填充。另外,也可以基于相似记录的特征进行填充,例如根据与缺失值所在        记录相似的其他记录的相应特征值来填充。

      使用 fillna() 填充缺失值,可以指定常量、统计值(如均值、中位数)或基于其他列的值

  (3) 插值法

      对于时间序列数据,可以使用 window functions 或自定义函数进行插值

示例代码:

from pyspark.sql import SparkSession
from pyspark.sql.functions import mean

spark = SparkSession.builder.appName("DataCleaning").getOrCreate()
df = spark.read.csv("data.csv", header=True, inferSchema=True)

# 删除包含任何缺失值的行
df_clean = df.dropna()

# 使用均值填充数值列的缺失值
mean_age = df.select(mean("age")).collect()[0][0]
df_clean = df.fillna({"age": mean_age, "gender": "Unknown"})

2.  清洗重复数据

数据集中可能存在完全相同的重复记录,或者根据某些关键特征判断为重复的记录。需要通过比较数据的各个字段,找出重复的记录,并根据业务需求保留其中一条,删除其余重复项,以避免重复数据对分析结果产生干扰。

使用 dropDuplicates() 方法删除完全重复的行,或根据指定列判断重复:

# 删除完全重复的行
df_clean = df.dropDuplicates()

# 根据特定列删除重复行
df_clean = df.dropDuplicates(subset=["id", "name"])

3.  处理异常值

对于数值型数据,可以通过计算统计量(如标准差、四分位数)识别并处理异常值:

from pyspark.sql.functions import col, stddev, mean

# 计算均值和标准差
stats = df.select(mean("salary").alias("mean"), stddev("salary").alias("std")).collect()
mean_val = stats[0]["mean"]
std_val = stats[0]["std"]

# 过滤超出3个标准差的异常值
df_clean = df.filter((col("salary") >= mean_val - 3*std_val) & (col("salary") <= mean_val + 3*std_val))

4.  数据类型转换与格式统一

   (1) 类型转换

      使用 cast() 方法转换列的数据类型。

   (2) 日期格式处理

      使用 to_date()date_format() 等函数处理日期。

   (3) 字符串规范化

      使用 trim()lower()regexp_replace() 等函数处理文本。

from pyspark.sql.functions import col, to_date, regexp_replace

# 类型转换
df = df.withColumn("age", col("age").cast("integer"))

# 日期格式转换
df = df.withColumn("date", to_date(col("date_str"), "yyyy-MM-dd"))

# 字符串处理(去除空格并转换为小写)
df = df.withColumn("name", regexp_replace(col("name"), "[^a-zA-Z]", ""))
df = df.withColumn("name", lower(col("name")))

5.  数据标准化与归一化

对于机器学习预处理,常需要对数值特征进行标准化或归一化:

from pyspark.ml.feature import StandardScaler, MinMaxScaler
from pyspark.ml.feature import VectorAssembler

# 将特征转换为向量格式
assembler = VectorAssembler(inputCols=["age", "salary"], outputCol="features")
df = assembler.transform(df)

# 标准化
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scaler_model = scaler.fit(df)
df_scaled = scaler_model.transform(df)

6.  数据验证与规则应用

使用 UDF(用户自定义函数)或内置函数应用业务规则:

from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

# 自定义验证函数
def validate_age(age):
    return age > 0 and age < 120

validate_age_udf = udf(validate_age, BooleanType())

# 应用验证规则
df = df.filter(validate_age_udf(col("age")))

7.  处理脏数据与不一致性

对于分类变量的不一致性(如大小写、拼写错误),可以使用正则表达式或字典映射进行修正:

# 修正性别字段中的不一致值
gender_mapping = {"M": "Male", "F": "Female", "male": "Male", "female": "Female"}

def clean_gender(gender):
    return gender_mapping.get(gender, gender)

clean_gender_udf = udf(clean_gender)
df = df.withColumn("gender", clean_gender_udf(col("gender")))

8. 关联数据清洗

处理多个数据集之间的关联时,需确保键的一致性:

# 两个数据集的连接
df1 = spark.read.csv("users.csv", header=True)
df2 = spark.read.csv("orders.csv", header=True)

# 连接前清洗键
df1 = df1.withColumn("user_id", col("user_id").cast("integer"))
df2 = df2.withColumn("user_id", col("user_id").cast("integer"))

# 内连接
joined_df = df1.join(df2, on="user_id", how="inner")

9. 并行处理与优化

Spark 的优势在于分布式计算,可通过调整分区数(repartition() 或 coalesce())优化性能:

# 增加分区以提高并行度
df = df.repartition(100)

# 缓存中间结果
df.cache()

10. 日志与质量监控

记录清洗过程中的统计信息,便于监控数据质量:

# 记录清洗前后的数据量
print(f"原始数据行数: {df.count()}")
print(f"清洗后数据行数: {df_clean.count()}")

# 保存清洗后的数据
df_clean.write.csv("cleaned_data.csv", header=True)

Spark 数据清洗充分利用了分布式计算的优势,通过 DataFrame API、SQL 和 MLlib 提供了丰富的工具。核心流程包括:读取数据、处理缺失值、去重、异常值处理、格式统一、标准化,以及最终的质量验证和输出。根据数据规模和复杂度,可灵活组合这些方法完成高效的数据清洗。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐