背景:一次意外的版本升级

最近,AWS 集群上的 EMR-Serverless 版本被 IT 部门升级到了新版本,这导致原来运行稳定的 PySpark 任务突然无法正常执行。究其原因,是 PySpark 在 EMR-Serverless 这种托管环境下对依赖包的处理方式比较特殊:所有 EMR 之外的依赖都需要打包到 venv 虚拟环境中,并以 zip 格式上传到 S3。

然而,一个令人困扰的问题是:我的两台 Mac 打包的 zip 文件上传后总是报错,而手边又没有 Windows 电脑可用。公司的 Windows 设备受 IT 管控,也不方便让同事安装 Python 环境来帮忙打包。最终不得不求助 AWS 客服来完成打包工作。

这种依赖外部打包的方式带来非常大的不便——每次需要添加新的 pip 包都变得很麻烦, 尤其是在大公司要到处求人。正好通过 Spark Executor 并行解析文件的性能瓶颈也越来越明显,因此我决定从头重构整个流程。


性能瓶颈分析

我们的业务场景是使用 Python 包 asammdf.dat 格式的数据文件解析为 numpy 格式,然后转换成业务所需的数据结构。最初的设计思路是利用 Spark Executor 的并行能力来加速解析过程,但在实际运行中暴露出两个严重的问题:

1. CPU 资源浪费严重

由于 asammdf 是单线程包,每个 Executor 只能有一个 CPU 核心参与解析工作,其他核心处于空闲状态。考虑到解析后的数据量可能达到亿级规模,我们不得不为每个 Executor 分配多核心配置,这导致了严重的资源浪费。

2. 数据倾斜问题

文件大小差异巨大:小文件几分钟就能完成解析,而大文件可能需要 30 分钟甚至更长时间。这导致整个 Job 被最慢的 Task 拖累,大量 Executor 在完成自己的任务后处于等待状态。

架构重构方案

经过权衡,我决定将流程拆分为两个独立的阶段:

  1. 文件解析阶段:单独完成 .dat 文件的解析和预处理
  2. 数据处理阶段:定期通过 Spark Job 批量处理解析后的数据

对于文件解析阶段,考虑到依赖包体积较大,我放弃了 Lambda Function 方案(受 250MB 部署包限制),转而选择 AWS Fargate 实现并发解析。每个 Fargate 实例只需分配 1 核 4GB 内存即可满足需求。

然而,这个方案也遇到了新的挑战:每个 Fargate 实例都需要连接 MySQL 数据库来更新文件的处理状态。当 Fargate 实例数量达到 VPC IP 池上限时,出现了 MySQL 连接数过多的问题,导致服务不可用。这个经历让我意识到,在 Serverless 架构中,数据库连接池管理是一个需要特别注意的问题。


Spark 开发环境搭建

坑点一:Spark 与 Hadoop 版本冲突

在开始 Scala 开发之前,我首先要解决版本兼容性问题。基于之前的经验,我知道 EMR 中预置了 Spark 和 DeltaLake 相关组件,因此不需要将这些依赖打入 Fat Jar,可以避免潜在的版本冲突(曾经因为打包了 Hudi 而不是使用环境自带的版本,导致与 HBase 冲突,无法正常连接)。

EMR 7.8 的组件版本如下:

  • Spark: 3.5.4
  • Hadoop: 3.4.1
  • Delta Lake: 3.3.0
  • Scala: 2.12.18

由于在 EMR-Serverless 上提交和测试代码不够便捷,我选择先在本地通过 spark-submit 进行调试。

问题排查过程

一开始代码总是报 Spark 版本冲突,检查了 pom.xml 和依赖包,配置明明都是 Spark 3.5.4 和 Hadoop 3.4.1。直到我想起来检查本地环境:

$ spark-submit --version
# 发现是通过 brew 安装的 Spark 4.0

问题找到了!spark-submit 使用的是本地的 Spark 4.0,自然会产生版本冲突。

尝试通过 brew install spark@3.5.4 指定版本,但 Homebrew 找不到这个版本的 formula。于是转向 Apache Spark 官网下载对应版本,但安装后依然报错,这次错误信息变成了 Hadoop 3.3.6 版本冲突。

经过一番查找,发现问题根源:Spark 3.5.4 预编译版本内置的是 Hadoop 3.3.6,而 EMR 使用的是 Hadoop 3.4.1。

解决方案

正确的做法是从 Apache Spark 官网下载 “without Hadoop” 的 Spark 3.5.4 版本,然后在本地单独安装 Hadoop 3.4.1。这样就能确保本地开发环境与 EMR 环境完全一致。

至此,本地开发环境终于搭建完成,可以开始处理 EMR 特定的配置问题了。


坑点二:神秘的 Iceberg JDK 版本冲突

EMR Serverless 可能因为用户基数不大,加上成本与传统 EMR 集群相差不大,导致相关文档相对匮乏,官方文档中的许多配置项描述也比较模糊。

问题现象

在提交阶段,我为了省事让 AI 生成了一个 spark-submit 命令,其中包含了一些 JVM 相关配置。由于提交时一直报版本错误,我直接在配置中硬编码了 JVM 版本。

本地测试一切正常,打包也没有问题。但提交到 EMR Serverless 后,诡异的错误出现了:

Exception in thread "main" java.lang.UnsupportedClassVersionError:
org/apache/iceberg/spark/source/IcebergSource has been compiled by a
more recent version of the Java Runtime (class file version 55.0),
this version of the Java Runtime only recognizes class file versions up to 52.0

排查过程(一波三折)

  1. 检查打包配置:Maven POM 文件配置正确,反复测试了 JDK 8、11、17 三个版本,问题依旧

  2. 尝试升级 JDK:计划升级到 JDK 11,但遇到 Scala 兼容性问题。经查证,Scala 2.12 在编译时不支持指定 JDK 版本,只能使用 JDK 8。要使用 JDK 11 必须升级到 Scala 2.13,但 EMR 集群只支持 2.12(放弃)

  3. 检查 IDE 配置:反复修改 IDEA 中的 Scala 和 JDK 配置,以及 EMR Serverless 的 additional configuration 中的 JDK runtime version,问题依旧

  4. 检查 JAR 包依赖:解压上传的 JAR 包,确认其中并没有任何 Iceberg 相关的类(项目仅依赖 DeltaLake)。检查 DeltaLake 的传递依赖,也没有发现 Iceberg

  5. 精简 Fat Jar:考虑到 EMR Serverless 采用类似 Spark on K8s 的架构,镜像是动态拉取的,无法直接查看内置 lib 的版本。尝试精简 Fat Jar,仅保留 Jackson 包,其他全部移除,问题依旧

  6. 寻求 AWS Support:得到了一个可以正常运行的 POM 配置示例(运行 Word Count),终于看到了希望

  7. 对比测试:启动了一个传统 EMR 集群,使用相同的 JAR 包通过 spark-submit 测试,可以正常运行!这说明问题确实出在 EMR Serverless 的特定配置上

  8. 最终发现:反复测试后,终于定位到问题根源——EMR Serverless 控制台的 Spark Properties 配置中,有一个由 AI 生成的参数:

-conf spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/

问题总结

删除这个 JAVA_HOME 配置后,Iceberg 版本冲突问题彻底解决。至于为什么一个环境变量配置会引发 Iceberg 的类加载错误,目前还不完全清楚,可能与 EMR Serverless 的类加载机制有关。

教训:对于 AI 生成的配置参数,即使看起来”很常见”,也要仔细验证其必要性。能省略的参数就省略,保持配置的最小化原则,这样才能更好地控制和排查问题。


坑点三:Submit 参数与 Driver Properties 的优先级陷阱

解决了 Iceberg 问题后,小数据集测试一切正常。但在测试生产环境的大数据集时,系统报告资源不足。

问题现象

最初怀疑是 Dynamic Allocation 配置问题,导致系统不断申请新资源,直到耗尽集群预设的资源上限仍在继续申请。然而反复调整 Dynamic Allocation 参数后,问题依旧,而且错误信息越来越诡异:

  • 配置的初始资源:10 个实例 × 8 核 × 16GB = 80 核
  • 错误信息显示:尝试申请 29,588 个 CPU 失败

排查过程

通过 Spark Application UI 反复检查,发现几乎所有 Executor 运行正常,唯一的异常就是在申请天文数字般的 CPU 资源。

与 AWS Support 深入排查后,终于发现了问题:代码中遗留了一个测试用的 properties 文件。这个文件在 Driver 初始化时被读取,其中的配置覆盖了提交时的参数。有趣的是,即使 properties 文件中配置的总核数也不过 300 左右,与错误信息显示的 29,588 核相差巨大。

关键发现

我原本以为 spark-submit 的参数会覆盖代码中初始化 SparkSession 时的配置,但实际情况恰恰相反:

配置优先级:Driver 初始化时的 SparkConf > spark-submit 参数 > 默认配置

删除代码中初始化 SparkSession 时设置的 memory 和 cores 相关配置后,任务终于能够正常运行。


经验总结与最佳实践

这次从 PySpark 到 Scala 的重构之路虽然坎坷,但多少也有一些收获。作为 EMR Serverless 用户,虽然踩了不少坑,但也积累了一些宝贵的经验。

1. 选择合适的开发语言

相比 PySpark,Scala 开发在 EMR Serverless 环境下有明显优势:

  • 更好的调试体验:编译型语言的错误更容易定位
  • 更少的部署问题:不需要处理 Python 虚拟环境打包的跨平台问题
  • 更好的性能:特别是在 CPU 密集型任务中

2. 配置管理的黄金法则

核心原则:配置集中化,参数最小化

在 Serverless 环境中:

  • 将所有配置项集中在一处(推荐使用外部配置文件或提交参数)
  • 代码中避免硬编码任何资源配置
  • 遵循”最小配置”原则,不确定的参数宁可不写
  • 避免在多个地方(代码、提交脚本、控制台)重复配置

因为你永远不知道哪个”看似无害”的参数会在某个时刻导致整个应用无法运行。

3. 谨慎使用 AI 生成的配置

虽然 AI(如 Context7 + Claude Sonnet 4)可以帮助生成命令和配置,但:

  • AI 倾向于”防御性编程”,会添加大量”以防万一”的配置
  • 不同云平台对参数的内部处理机制不同,AI 无法完全掌握
  • 建议:将 AI 生成的配置作为参考,自己逐一验证必要性,从最小配置开始测试

AI 在配置生成方面的问题:它会尝试覆盖各种可能的错误场景,但很多场景从业务底层就不会发生,这反而增加了系统的复杂性和不可控性。

4. Serverless 服务的架构设计原则

对于 AWS Serverless 服务(Lambda、Fargate、EMR Serverless),设计时应遵循:

  • 简单优于复杂:将复杂任务拆分为多个简单任务
  • 松耦合设计:减少依赖,便于独立测试和部署
  • 关注黑盒限制:了解服务的隐藏限制(如 IP 池、连接数等)
  • 逐步验证:先在小规模环境验证,再逐步扩展

依赖越多,黑盒越多,问题越难排查。即使求助 AWS Support,也可能因为涉及多个服务的交互而难以给出明确答案。

5. 版本管理的严格性

开发环境与生产环境的版本必须严格一致:

  • Spark、Hadoop、Scala 版本要精确匹配
  • 使用”without Hadoop”版本的 Spark 可以更灵活地控制依赖
  • 建立版本对照表,避免因版本差异导致的”本地可以,云上不行”

写在最后

EMR Serverless 作为一个相对新的服务,文档和最佳实践还在不断完善中。虽然这次经历充满挑战,但最终重构后的系统在稳定性和性能上都有显著提升。

希望这篇文章能帮助到同样在 EMR Serverless 上开发的同学,少走一些弯路。如果你也有类似的踩坑经历,欢迎在评论区分享交流!