字节跳动开源自研 Shuffle 框架——Cloud Shuffle Service

385次阅读  |  发布于2年以前

今天,字节跳动宣布,正式开源 Cloud Shuffle Service。

Cloud Shuffle Service(以下简称CSS) 是字节自研的通用 Remote Shuffle Service 框架,支持 Spark/FlinkBatch/MapReduce 等计算引擎,提供了相比原生方案稳定性更好、性能更高、更弹性的数据 Shuffle 能力,同时也为存算分离/在离线混部等场景提供了 Remote Shuffle 解决方案。

目前,CSS 已在 Github 上开源,欢迎感兴趣的同学一起参与共建!

项目地址:

https://github.com/bytedance/CloudShuffleService

开源背景

在大数据计算引擎中,Pull-Based Sort Shuffle 是一种常见的 Shuffle 方案,比如 Spark/MapReduce/FlinkBatch (高于1.15版本)等都将 Sort Shuffle 作为引擎默认方案,但是 Sort Shuffle 实现机制有一定的缺陷,在大规模生产环境下经常因为 Shuffle 问题影响作业稳定性。

以 Spark 的 Sort Shuffle 为例:

如上图所示链路,Sort Shuffle 会存在以下一些问题:

这些都很容易导致 ShuffleRead 慢或者超时,引起 FetchFailed 相关错误,严重影响线上作业的稳定性,ShuffleRead 慢也会大大降低资源利用率(CPU&Memory),同时 FetchFailed 也会导致 Stage 中相关 Task 重算,浪费大量资源,拖慢整个集群作业运行;无法存算分离的架构,在在离线混部(在线资源磁盘不足)/Serverless 云原生等场景下,也很难满足要求。

字节跳动使用 Spark 作为主要的离线大数据处理引擎,每天线上运行作业数过百万,日均 Shuffle 量 300+PB。在 HDFS 混部&在离线混部等场景,Spark 作业的稳定性经常无法得到保障,影响业务 SLA:

在此背景下,字节跳动自研了 CSS,用来解决 Spark 原生 ESS 方案的痛点问题。自 CSS 在内部上线一年半以来,当前线上节点数 1500+,日均 Shuffle 量 20+PB,大大提高了 Spark 作业的 Shuffle 稳定性,保障了业务的 SLA。

Cloud Shuffle Service 介绍

CSS 是字节自研的 Push-Based Shuffle Service,所有 MapTask 通过 Push 的方式将同一个 Partition 的 Shuffle 数据发送给同一个 CSS Worker 节点进行存储,ReduceTask 直接从该节点通过 CSS Worker 顺序读取该 Partition 的数据,相对于 ESS 的随机读取,顺序读的 IO 效率大大提升。

CSS 架构

Cloud Shuffle Service(CSS) 架构图

CSS Cluster 是独立部署的 Shuffle Service 服务,主要涉及的组件为:

CSS Worker 启动后会向 ZooKeeper 节点注册节点信息,它提供 Push/Fetch 两种服务请求,Push 服务接受来自 MapTask 的 Push 数据请求,并将同一个 Partition 的数据写到同一个文件;Fetch 服务接受来自 ReduceTask 的 Fetch 数据请求,读取对应 Partition 数据文件返回;CSS Worker还负责 Shuffle 数据清理的工作,当 Driver 进行 UnregisterShuffle 请求删除 ZooKeeper 对应 ShuffleId 的 Znode 时,或者 Application 结束删除 ZooKeeper 中 ApplicationId 的 Znode 时,CSS Workers 会 Watch 相关事件对 Shuffle 数据进行清理。

作业启动后会在 Spark Driver 中启动 CSS Master,CSS Master 会从 ZooKeeper 中获取到 CSS Worker 的节点列表,然后为后续 MapTask 产生的各个 Partition 分配 n 个副本(默认为2)的 CSS Worker 节点,并对这些 Meta 信息进行管理,供 ReduceTask 获取 PartitionId 所在的 CSS Worker 节点进行拉取,同时在 RegisterShuffle/UnregisterShuffle 过程中会在 ZooKeeper 中创建对应的 ApplicationId/ShuffleId 的 Znode,CSS Worker 会 Watch Delete 事件对 Shuffle 数据进行清理。

如前描述,用来存储 CSS Worker 节点信息以及 ShuffleId 等信息。

CSS 特性

CSS除了支持 Spark(2.x&3.x) 之外,也可以接入其他引擎,目前在字节跳动内部,CSS 还接入了 MapReduce/FlinkBatch 引擎。

为了解决单个 Partition 太小,Push 效率比较低的问题,实际会将多个连续的 Partition 组合成更大的 PartitionGroup进行 Push。

跟 ESS 类似,MapTask 中的 CSS Buffer 将所有 Partition 的数据都存储在一起,在 Spill 之前会对数据按照 PartitionId 进行排序,然后按照 PartitionGroup 维度进行数据推送;同时 CSS Buffer 完全纳入 Spark 的 UnifiedMemoryManager 内存管理体系,内存相关参数由 Spark 统一管理。

Push 失败:当触发 Spill 进行 Push PartitionGroup 数据时,每次 Push 的数据大小为 4MB(一个Batch),当某次 Push batch 失败时,并不影响之前已经 Push 成功的数据,只需要重新分配节点(Reallocate)继续 Push 当前失败的数据以及后续还未 Push 的数据,后续 ReduceTask 会从新老节点读取完整的 Partition 数据;

多副本存储:ReduceTask 从 CSS Worker 读取某个 Partition 数据是按照 Batch 粒度进行拉取的,当 CSS Worker 异常(如网络问题/磁盘坏等)导致无法获取该 Batch 数据,可以继续选择另外一个副本节点继续读取该 Batch 以及后续 Batch 的数据;

数据去重:当作业开启 Speculative 推测执行会有多个 AttempTask 并发跑,需要在读取的时候进行去重。在 Push Batch 的时候,会给 Batch 数据加上 Header 信息,Header 信息中包含 MapId + AttempId + BatchId 等信息,ReduceTask 读取时可以根据这些 ID 信息进行去重。

CSS 完整支持 AQE 相关的功能,包括动态调整 Reduce 个数/ SkewJoin 优化/Join 策略优化。对于SkewJoin,CSS做了更多的适配优化工作,解决了 Skew Partition 数据被多个 ReduceTask 重复读取问题,大大提高了性能。

CSS 性能测试

我们将 CSS 与开源的 ESS 使用独占 Label 计算资源进行 1TB 的 TPC-DS Benchmark 测试对比,整体端到端的性能提升15%左右,部分 Query 有30%以上的性能提升。

同时我们也使用线上混部资源队列(ESS 稳定较差)进行 1TB 的 TPC-DS Benchmark 测试对比,整体端到端性能提升4倍左右。

CSS 1TB 测试提升 30% 以上的 Query

未来规划

CSS 目前开源了部分 Feature,还有一些 Feature & 优化后续会陆续开放:- 支持 MapReduce/FlinkBatch 引擎;

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8