绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
阅读代码:Spark 与 Flink 中的 RPC 实现
2020-07-07 12:56:51

近日常有同学来问我如何阅读代码,关于这个问题的一般性答案我特别提了一个问题并自问自答。出于提供一个实际的例子的考量,正好此前综合地阅读了 Spark 的 RPC 实现、Flink 基于 Akka 的 RPC 实现和 Actor Model 的通信模型,写成本文分享我阅读分布式计算系统 Spark 和 Flink 中的 RPC 实现的过程和思考。

简介 Actor Model 及 Akka 的问题

通常来说,阅读代码的流程是首先了解自己要阅读的代码解决了什么问题,这个问题的现有解决方案是什么,有什么优势和缺点。大致清楚了这些背景之后再在走读代码的过程中思考阅读的代码具体是怎么解决这个问题的,后专注到重点难点的代码块的理解上。也就是说,代码阅读重要的不是代码。代码只是将思考的结果转换为实际可用的软件的手段,思考的结果或者说解决问题的方法才是重要的内容。

分布式计算系统的分布式特性决定了设计过程中必然会考虑节点间的通信问题,即笼统的 RPC 需求。关于 RPC 和 RMI 及 Actor Model 具体的差别本文不做展开,主要集中在 Spark 和 Flink 的 RPC 实现来介绍 Actor Model 下的 RPC 实现。

Actor Model 的主要概念包括

  • 通信的主体 Actor
  • 通信的内容 Message
  • 单线程先到先处理的消息处理器 Mailbox

特别需要提及的是 Actor 之间的通信是通过类似于地址的 ActorRef 来引用其他的 Actor 的,同时,在实现中,需要一个支持 Actor Model 运行起来的 ActorSystem 环境。这些具体的概念和名词属于 Akka,我们会在后面看到它们如何在 Spark 和 Flink 中被一一对应。

Actor Model 一个很少被注意的特点是它的建模过程中只存在 tell 这一个通信原语,ask 等等只是构建在 tell 上层的方便的通信模式。这就导致一个问题,即 Actor Model 原生的编程模式是明显不同于传统的编程模型的。传统的编程模型中,函数调用是有返回值的,即使采用 Future 作为返回值的占位符,本质上还是有一一对应的返回值的;而在 Actor Model 中,消息一经发出就被遗忘,即所谓的 fire and forget 模式。要建立当前发出的消息和稍后收到的消息之间的 ask and answer 关系,需要额外的工作。这部分的内容可以参考 Akka 官方文档中介绍通信模式的章节,本身可以作为 Akka 佳实践的一部分,有时间我会专门写一篇文章介绍 Actor Model 下完全被颠覆的编程模型以及通过在其上模拟常见的编程模型来探索 Actor Model 的佳实践。

关于更多 Actor Model 的概念性和介绍性资料,可以参考的资料有 Akka 的官方文档《反应式设计模式》等等。

Akka 作为目前成熟的 Actor Model 的实现之一,以及拥有容易理解的单线程 Actor 和并发通信模型,广泛地充当了 JVM 系的分布式系统的 RPC 层。Akka 近的演化有两个重点,一个是类型化(Typed)的 Akka,另一个是在拆分行为(Behavior)和状态(State)的概念。前者我们后面看到 Spark 和 Flink 的 RPC 实现时就能看到选择标准的不同,后者这里不作展开,可能会在后续讨论函数式编程的文章中再次提及。

尽管 Akka 的实现非常成熟,但是直接使用 Akka 的底层 Actor Model 的软件却不多。对于业务软件来说,Akka Model 过于底层,如果要利用它带来的好处通常会直接使用 Akka Streams 和 Akka HTTP 等上层建筑;对于其他分布式系统来说,主要有以下两个问题。

个问题是两层集群的负担。如果我们使用 Akka 作为底层 RPC 的实现,本身 Akka 会有相应的基础组件,包括 ActorSystem 或者进一步使用 Akka Cluster 的话相应的 Cluster 对象。我们的分布式系统例如 Spark 和 Flink 本身有自己的集群管理策略,在 Spark 中有 Driver 和 Worker 的概念,在 Flink 中有 JobManager 和 TaskManager 等概念。如果在处理本身系统的集群管理的同时还要兼顾底层的 Akka 集群,这样两层的集群在实际开发和运维的过程当中会带来额外的复杂性。尤其是 Akka 作为一个功能复杂的重量级框架,并且在 Typed Akka 中做出了限制公开的直接沟通两个 Actor 的能力,强制要求使用 Akka Cluster 的决定。同时处理两层集群复杂的状态机和角色与消息的转换将会是一个巨大的负担。

第二个问题是版本的负担,这也是 Spark 走向去 Akka 化的直接原因,也是 Flink 社区经常被提问的一个问题。我们知道,为了保证分布式系统的稳定性,它依赖的组件尤其是 RPC 实现这样底层模块的依赖版本会保持相当的稳定性。这样就有一个问题,Spark 和 Flink 的用户在使用它们的同时也很有可能使用 Akka,并且依赖的是另一个 Akka 的版本。这样,就会出现版本不同带来的不兼容性问题。通常来说,这一点可以通过发布一个项目专有的第三方依赖并使用 shaded 技术重定位包名来解决问题。但是由于重定位为了覆盖反射调用,是在字节码级别对全限定名和字符串的包名前缀做替换。一般来说,包名都是诸如 org.apache.spark 或者 org.apache.flink 的形式,具有性,替换起来不会有什么问题。Akka 就不一样了,它的包名是 akka.actor 等等,跟配置名称是一样的。这就导致重定位会错误改动代码中的配置名字符串导致运行时字符串失配出错。版本问题在 Lightbend 全家桶里是不存在的,例如 Play 通过接口暴露底层的 Akka 数据结构,并固定依赖到某一个版本,这样使用 Play 的人需要 Akka 的功能是只需要通过接口拿到对应的 Akka 数据结构就可以,但是这种方式并没有考虑和其他系统的版本兼容问题。

虽然上述问题可以通过定制 ClassLoader 并精心调整打包策略来绕过,或者要求用户程序使用跟系统框架兼容的 Akka 版本,但是这会导致复杂不友好的用户体验,而清楚简单的用户体验很多时候比功能更能决定一个框架的生存空间。同时,Akka 提供的很多功能,例如 Actor Model 基石的监督(Supervise)功能,对于上层提供 Failover 机制的 Spark 和 Flink 来说是多余的。前有用户体验的硬性需求,后有开发轻量化的敏捷需求,Ligetbend 系以外的成熟的分布式系统开发自己的 RPC 实现是理所当然的选择。

理解了 Spark 和 Flink 为什么要开发自己的 RPC 实现之后,我们再看到 RPC 实现具体的考量点和内容。

Spark 的 RPC 实现

Spark 开发自己的 RPC 实现以换下 Akka 的理由主要是上面提及的版本依赖问题,在社区中记录为 SPARK-5293

阅读相关代码,首先我们要定位代码的位置。Spark 的 RPC 实现主要位于 core 模块下的 org.apache.spark.rpc 这个包下,阅读代码的过程中通过跳转到定义和查找使用点可以找到完整的脉络。结果而言,除了实际的 RPC Endpoint 实现之外,主要相关的代码还包括 common/network-common 路径下网络传输层相关的底层支持。

Spark 的 RPC 实现虽然是为了替换 Akka 而诞生的,但是它实际上可以看成一个简化版的 Akka,仍然遵循许多 Actor Model 的抽象。例如

  • RpcEndpoint 对应 Actor
  • RpcEndpointRef 对应 ActorRef
  • RpcEnv 对应 ActorSystem

RpcEndpoint 与消息处理模型

这其中从模型上来说简单的反而是 RpcEndpoint,因为所有的实现逻辑是具体实现类的事情,它其实只是一个简单的存根(Stub)。总的来说,RpcEndpoint 有以下接口

private[spark] trait RpcEndpoint {
  final def self: RpcEndpointRef = ???
  final def stop(): Unit = ???
  val rpcEnv: RpcEnv = ???

  def receive: PartialFunction[Any, Unit] = ???
  def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = ???

  def onError(cause: Throwable): Unit = ???
  def onConnected(remoteAddress: RpcAddress): Unit = ???
  def onDisconnected(remoteAddress: RpcAddress): Unit = ???
  def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = ???

  def onStart(): Unit = ???
  def onStop(): Unit = ???
}

可以看到,上面的函数我分成了四组,其中组是和元操作有关的,望文生义不做解释;第三组是连接和错误处理相关的回调函数,主要是记录日志和处理边界情况的,也不用多做介绍;第四组实现的比较多,虽然和第三组一样是挂载在特定事件上的回调函数,但是 RpcEndpoint 启动和关闭时常常需要做状态初始化和终结,以及资源的申请和释放,所以 onStartonStop 是经常被实现的接口。

这里在展开说一下第二组接口。首先是 receive,这个接口跟 Akka 里面 Actor 的 receive 是类似的,我们看到它的签名是 PartialFunction[Any, Unit],也就是说实现起来类似于下面的代码块。

override def receive: PartialFunction[Any, Unit] = {
  case Message => ...
  case BoxedMessage(msg1, msg2) => ...
}

可以看到和 Untyped Akka 别无二致,也就是说 Spark 的 RPC 实现也是非类型化的,编程模型上基于消息和模式匹配来做的。后面我们会看到 Flink 对这一点做了不同的选择,介绍完 Flink 的情况后我们会做一个总的探讨。

另一个接口就比较有意思了,receiveAndReply 实现了接收信息后返回的功能。由于没有实现 Akka 中上下文 sender() 的逻辑,Spark 使用了另一个接口来处理需要返回的调用。我们分两点说明 sender() 的问题和 Spark 基于 Actor Model 实现了传统的具有返回值的调用的方式。

点,sender() 主要的问题是,它是一个方法调用,而不是一个确定性的值。这是函数式编程的拥趸喜欢讨论的话题,即在不同的时刻调用 sender() 会返回不同的值。乍一看我们在每次处理一条消息的时候都调用 sender() 获得当前消息的发送来源并没有问题,不过这个方法在 Akka 社区给新手带来了不少麻烦。

大的问题还是上面提到的调用点的问题。通常来说,由于 Actor Model 中的 Actor 是单线程的处理消息的,你在同一个消息处理过程中多次调用 sender() 返回的都是当前消息的来源。不过,在一个常见的场景中,你在处理消息的时候发起了另一个异步动作,在异步动作中调用 sender() 来获取当前消息的来源。由于异步动作触发的时间是未知的,实际上当它触发时再次调用 sender() 的时候,可能返回的就是另一条消息的来源了。这个问题很好解决,即用一个变量保存当前的 sender() 后面传递这个对象而不是再次调用 sender() 获取对象。显然,Spark 的 receiveAndReply 中的参数 context 就是这个可用于发回消息的上下文,与 sender() 类似。而在 Typed Akka 中,由于 sender() 无法确切的类型化,因此采用的是将消息来源直接编码在发送的消息中的方式以在需要的时候使用它回复消息,这要求 ActorRef 在不同的 ActorSystem 上正确的序列化和反序列化。

第二点,我们看到这里的时候就会想,那我现在有两个 receive 函数,虽然我可以根据需不需要发送回复消息把消息处理逻辑拆分到不同的函数里,但是 Spark 又是怎么知道应该把入站的请求分配到哪个函数的呢?这个就涉及到 Spark RPC 实现的细节。简单的说我们先看到调用两个 receive 函数的片段。

// Inbox.scala
class Inbox {
  def process(dispatcher: Dispatcher): Unit = {
    // ...
    message match {
          case RpcMessage(_sender, content, context) =>
            try {
              endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
                throw new SparkException(s"Unsupported message $message from ${_sender}")
              })
            } catch {
              case e: Throwable =>
                context.sendFailure(e)
                throw e
            }

          case OneWayMessage(_sender, content) =>
            endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
              throw new SparkException(s"Unsupported message $message from ${_sender}")
            })

          case OnStart =>
            endpoint.onStart()
            // ...

          case OnStop =>
                        // ...
            endpoint.onStop()
            assert(isEmpty, "OnStop should be the last message")

          case RemoteProcessConnected(remoteAddress) =>
            endpoint.onConnected(remoteAddress)

          case RemoteProcessDisconnected(remoteAddress) =>
            endpoint.onDisconnected(remoteAddress)

          case RemoteProcessConnectionError(cause, remoteAddress) =>
            endpoint.onNetworkError(cause, remoteAddress)
        }
    // ...
  }
}

简单扫过回调系列函数,我们看到 Spark RPC 判断将消息转往何处主要是看消息的类型是 RpcMessage 还是 OneWayMessage。从名字中我们就可以看出,前者指的是调用并返回的消息,后者是 fire and forget 的消息。我们跳转到定义并查找初始化点,可以发现生成这两种不同信息的差异的根源发生在 RpcEndpointRef 是调用 ask 还是 send 上,在的 Netty 实现上一路会经过 NettyRpcEnv 对应的 asksend 方法,生成不同的消息发送到远端。这也就是前面说的 Spark 原生的支持 ask 语义的意思。从熟悉的变成模型出发,可以把 ask 当成返回值不为 void 的函数或者 Pascal 中的 function,send 当成返回值为 void 的函数或者 Pascal 中的 procedure。

send 的语义是比较清楚的,关于 ask 的语义还有一个值得讨论的点。我们知道 ask 会有一个返回值,这个返回值是真正有意义的返回值的占位符 Future,而 Future 一般的处理方式在经过拼接和转换之后终究是会有一个 onSuccess 或者 onFailure 的触发动作,这个触发在哪个线程上执行是很重要的。这涉及到我们在编写 receive 函数的时候对异步行为和同步策略的判断。Spark 的实现类似于 Akka 中 AskPattern 引入 PromiseActorRef 的方式,生成一个 Promise 并在对应的返回收到时完成,这个 Promise 作为 ask 的返回值。相关的回调逻辑发生在 NettyRpcEnv#askAbortable 中,可以看到,本地消息中 Promise 的完成发生在发送消息的同一个线程上,而远端消息中 Promise 的完成一路探查到 TransportClient 和 TransportChannelHandler 可以发现完成在 Netty 的 channelRead0 上,也就是说,Spark 的 ask 返回的 Future,其完成的时间点并不一定和 RpcEndpoint 的主线程同步。这可能会导致在不加同步策略下的一些问题,例如通过 ask 询问一个远端节点的状态和远端节点主动 send 过来的状态同时触发状态处理逻辑而导致竞态条件。补充说明,Spark 的 RpcEndpoint 本身也可能并发的处理消息,仅当它是 ThreadSafeRpcEndpoint 或 IsolatedRpcEndpoint 时才表现出类似于 Actor Model 下单线程 Actor 的行为。上面提到的 ask 导致竞态条件的问题在 Akka 中也存在,这倒不算 BUG,只是在使用的时候需要注意采用合适的同步策略。

RpcEnv 与消息分派模型

接下来我们看到更接近 RPC 实现的核心的代码。RpcEnv 是正确的处理 RpcEndpoint 存在和运行及其支持的网络环境的上下文,目前 Spark 中只有基于Netty 的实现。

对于服务端来说,RpcEnv 是支持 RpcEndpoint 正常运行的环境,调度线程处理消息并负责 RpcEndpoint 的生命周期管理;对于客户端来说,可以使用 RpcAddress 等方式从 RpcEnv 中获取可用的本地或远端的 RpcEndpointRef,这是一个 RpcEndpoint 的位置透明的引用或者叫句柄,可以通过调用它的 send 或 ask 方法来向 RpcEndpoint 发送信息。

对于消息的分派,我们从消息的入站和出站来看。

首先看出站,即本 RpcEnv 向 RpcEndpoint 发送消息。注意这里如果是本地的 RpcEndpoint,会将消息直接通过 Dispatcher 分派到本地的 RpcEndpoint 上,严格来说不算出站。如果是远端的 RpcEndpoint,NettyRpcEnv 会通过 postToOutbox 方法,对于 ask 来的方法的回复,构造的消息来源 RpcEndpointRef 会带有网络层的 client,因此是直接返回;而对于本地直接出站的消息,则会根据接收者的地址放入 Outbox 的队列中。一个地址对应着一个 Outbox,在 Outbox 中的消息异步的被取出并发送。

接着看入站,入站的消息会统一先由 NettyRpcEnv 交给 Dispatcher,Dispatcher 在根据消息的元数据分派到对应的处理 RpcEndpoint 上。Dispatcher 中每一个和 RpcEndpoint 一一对应的地址都会被关联上一个 MessageLoop,类似于 EventLoop 它会负责处理发给 RpcEndpoint 的初步分派后的消息。每个 RpcEndpoint 实际绑定的消息处理触发器是 Inbox,Inbox 相当于 Actor Model 中的 Mailbox,负责接收外部发到当前 RpcEndpoint 即 Actor 的消息。DedicatedMessageLoop 只服务于一个 RpcEndpoint,因此它也只持有一个 Inbox,当消息由 Dispatcher 发给 DedicatedMessageLoop 之后,它就转发给的 Inbox;SharedMessageLoop 可服务于多个 RpcEndpoint,所以它的内部有一个 RpcEndpoint 地址对应到 Inbox 的映射,收到 Dispatcher 初步分派后的消息后它会再次进行分派发送到具体的 RpcEndpoint 中。这种 MessageLoop 的设计对应的是一般的 RpcEndpoint 和 IsolatedRpcEndpoint,主要是提供不同的同步保证和线程配置。

具体到 Inbox 的消息就比较直接了,抛开状态管理和异常管理不谈,主要的内容就是一个同步的先进先出的队列处理发布进来的消息,如上一节代码片段所贴,终根据消息的类型调用 RpcEndpoint 的不同方法。

Flink 的 RPC 实现

现在我们转过头来看 Flink 的 RPC 实现。总的来说 Flink 的 RPC 实现依然是基于 Akka 的,这一点与 Spark 基于 Netty 开发的一套不同。Flink 社区有去掉 Akka 依赖的计划,但进度只是 FLINK-4346 把接口抽象出来的程度,其底层实现仍然是 Akka,并没有解决一开始我们提到的使用 Akka 带来的问题。

我们看到 FLINK-4346 描述的目标,先从整体上了解它的设计方向。

It should address the following issues:
- Add type safety to the sender and receiver of messages. We want proper * methods to be called, rather than having generic message * and pattern matching everywhere. This is similar to typed actors.
- Make the message receivers testable without involving actors, i.e. the methods should be callable directly. When used with other component, the receiver will be wrapped in an actor that calls the methods based on received messages.
- We want to keep the paradigm of single-threaded execution per "actor"

首先我们可以看到的是它仍然强调了 Actor Model 的核心之一,单线程的 Actor 消息处理。其次,我们可以看到和 Spark 有两个重要的不同点。

其一是不同于 Akka 的 testkit 套路,Flink 强调远端调用和本地调用在编程模型上的统一性,从而可以在不引入 Actor 一套的情况下直接调用 Actor 的方法来进行测试。这一点实际上跟 RMI 是比较相似的,可以创建一个本地的对象调试,需要访问远端对象的时候就创建一个远端对象的引用。关于这个调用编程模型上的统一性,后面讲到 RpcGateway 和 RpcEndpoint 以及反射调用的时候会看到细节,总的来说这一套类似于 Akka 社区已经放弃的 Typed Actors 实现 Actor Model 类型化的方案。

其二是类型化,上面我们提到的编程模型本身跟类型关系不大。Flink 为了更好的实现防御性编程,期望在调用对应的远端方法的时候能够使用上类型系统的优势来保证参数和返回值的类型匹配,其中主要是返回值的匹配和对应的 RpcGateway 不像无类型的 ActorRef 或 RpcEndpointRef 一样难以判断哪些消息是合法的。不过由于 FLINK-4346 的历史局限性,它借鉴了当时 Typed Actors 的实现方案,这个方案后来被废弃。

由于不需要像 Akka 或 Spark 那样从 Netty 或者 Aeron 这样的网络层框架重新搭建消息分派系统,Flink 的讨论主要集中在它复刻 Typed Actors 的代码和线程模型上。

RMI 式的类型化 RPC 实现

Flink 中的 RPC 实现主要在 flink-runtime 模块下的 org.apache.flink.runtime.rpc 包中。由于复用了 Akka 的基础设施,它并不像 Spark 那样直接依赖传输层的实现,也不需要自己的分派信息。上次 Flink 的 PMC Chair Stephan Ewen 来北京,和他交流的时候确认了 Flink 只把 Akka 作为 RPC 底层来用,并没有使用 Akka 丰富的监督等其他功能,并且在未来有去掉 Akka 依赖的计划。

Flink 的 RPC 实现的主要抽象包括

  • ActorSystem 的封装 RpcService
  • Actor 与 RpcEndpoint 两层之间的胶合层 RpcServer
  • 业务逻辑的载体 RpcEndpoint
  • RpcEndpoint 的位置透明的引用 RpcGateway
  • 迷之线程模型辅助接口 MainThreadExecutable 和 MainThreadExecutor

可以看到,这个 Spark 和 Akka 基本一一对应的骨架是不一样的,主要的矛盾点在 RpcServer 这一层上。这是因为相比于前两者直接实现 Actor 或其等价物,Flink 的 RPC 实现是基于 Akka 的 Actor 实现了自己的 Actor 等价物 RpcEndpoint,这就导致模型的对应关系适配。

这个问题我们谈到 RpcServer 的具体代码的时候再提。Flink 的代码不能像 Spark 那样按照不同的类型来看,因为类的实现可能涉及到反射访问另一个类,这种情况下按照功能点来阅读代码会更好理解。

我们首先看到上面抽象的构造过程。后的辅助接口放在下一节讲,其他的抽象构造过程分别如下。

RpcServices 目前的实现 AkkaRpcService 是 Akka 的 ActorSystem 的封装,基本可以理解成 ActorSystem 的一个适配器。所以其构造过程也比较简单,就是将适配的对象引用保存后返回。复杂的是由 RpcServices 构造的 RpcServer。

RpcServer 的构造有两个触发点。我们先看到连接远端的 RpcEndpoint 时通过 RpcServices#connect 构造的 RpcServer。这个方法的两个重载的区别只在于是否实现 fencing 的功能,即区分监听同一地址的不同任期的 RpcEndpoint。由于 Flink 的 JobManager 等 RpcEndpoint 会通过主节点选举选出主节点,监听同一个地址的可能是节点的不同任期,而上一个任期的请求的回复应该被过滤掉以免影响当前任期的节点状态。这点先简单带过,我们看到 connect 除此以外的共同部分,摘要如下。

private <C extends RpcGateway> CompletableFuture<C> connectInternal(
  String address,
  Class<C> clazz,
  Function<ActorRef, InvocationHandler> invocationHandlerFactory
) {
  // ...
  final ActorSelection actorSel = actorSystem.actorSelection(address);
  final Future<ActorIdentity> identify = Patterns
            .ask(actorSel, new Identify(42), configuration.getTimeout().toMilliseconds())
            .mapTo(ClassTag.apply(ActorIdentity.class));

  final CompletableFuture<ActorRef> actorRefFuture = FutureUtils.toJava(identify).thenApply(
    (ActorIdentity actorIdentity) -> {
      if (actorIdentity.getRef() != null) {
        return actorIdentity.getRef();
      } else ...
    });

  final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose(
            (ActorRef actorRef) -> FutureUtils.toJava(
                Patterns.ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), configuration.getTimeout().toMilliseconds())
                    .mapTo(ClassTag.apply(HandshakeSuccessMessage.class))));

  return actorRefFuture.thenCombineAsync(
            handshakeFuture,
            (actorRef, ignored) -> {
                InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);
                ClassLoader classLoader = getClass().getClassLoader();
                C proxy = (C) Proxy.newProxyInstance(
                    classLoader,
                    new Class<?>[]{clazz},
                    invocationHandler);

                return proxy;
            },
            actorSystem.dispatcher());
}

连接的过程主要分成三个阶段,个阶段是通过 ActorSelection 和 Identify 找到和地址字符串对应的远端 Actor 的引用,接着发送握手消息确保远端的 Actor 正常工作,随后将这个 ActorRef 打包为一个 InvocationHandler 并转换为对应类型的代理后返回。这里前两个阶段都是 Akka 的基本操作,这里重点介绍一下后一个阶段,并说明它就是所谓的 RMI 式的 RPC 实现。

InvocationHandler 本身是 Java 内置的接口,其定义如下。

public interface InvocationHandler {
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable;
}

这个接口是给 Java 内置的代理功能使用的,invoke 方法的三个参数分别代表方法的接收方、方法引用和参数列表,或者我们用参数名简单的对应到方法调用,那就是 proxy.method(args) 这样的形式。我们看到穿件 Proxy 那一行的官方文档注释。

public class Proxy {
  /**
   * Returns an instance of a proxy class for the specified interfaces
   * that dispatches method invocations to the specified invocation
   * handler.
   *
   * <p>{@code Proxy.newProxyInstance} throws
   * {@code IllegalArgumentException} for the same reasons that
   * {@code Proxy.getProxyClass} does.
   */
  public static Object newProxyInstance(
    ClassLoader loader,
    Class<?>[] interfaces,
    InvocationHandler h
  ) throws IllegalArgumentException {
    // ...
  }
}

可以看出,所谓的代理对象就是可以处理 interfaces 定义的类型能接受的调用,并把这些调用转交给 InvocationHandler 来处理的对象。我们在调用 connect 方法时传递的 class 参数即是 RpcGateway 的一个子接口,而 RpcGateway 的子接口例如 JobMasterGateway 或 TaskExecutorGateway 则定义了 JobMaster 或 TaskExecutor 这个 RpcEndpoint 所能接受的调用。通过这种方法我们将产生一个跟特定的 RpcGateway 的子接口语义相同的对象,而这个对象所有的 InvocationHandler 在 Flink 中的实现恰好就是 RpcServer 的有效实现。RpcServer 本身也是 RpcGateway 的一个子接口。后这一点其实没有太多的理论支持,只是为了让编译通过和更好的处理 InvocationHandler#invoke 的逻辑所做的妥协,这也是 Flink 的 RPC 实现因为阻抗适配而带来的理解难度其中之一。

我们说到,在 Flink 里面,InvocationHandler 和 RpcServer 实际上指的是同一种东西,它们的实现只有两个,AkkaInvocationHandler 和 FencedAkkaInvocationHandler,后者如前所述与 fencing 相关,不做过多分析。我们从前者触发,主要的逻辑出现在排除了本身方法调用之后的实际代理工作 invokeRpc 方法上。这个方法的流程如下。

  1. 从所代理的方法的签名中查出可能的 RpcTimeout 的参数位置并抽取参数,这个超时时间主要是为了 ask 功能提供的。
  2. 构造相应的 RpcInvocation 消息,这个是 Flink 专用的的 Actor 类型 AkkaRpcActor 能识别的消息类型。根据 RpcGateway 或者说 RpcServer 的位置分别产生 LocalRpcInvocation 消息或 RemoteRpcInvocation 消息。两者的主要区别在于是否支持序列化,因为只有发往远端的消息才需要考虑序列化相关的事项。这个消息包含了方法调用的元信息,即方法名,参数列表和参数类型列表。由于发送的对象是确定的,就是和 RpcServer 保存的 ActorRef 对应的对象,因此不需要指定方法接收者。
  3. 根据方法的返回值类型进行不同的处理。如果是 void 即无返回值,则进行 tell 后返回;如果是 CompletableFuture 则进行 ask 后转换返回的 Future 的类型后返回 Future;如果是其他非 CompletableFuture 的返回值,则类似于前者,但是阻塞在 Future 上等待取得返回值后返回。

可以看到,在这里,方法调用被转换成了 RpcInvocation 这样的方法调用元信息,在远端接受到这些信息后通过反射进行调用,具体可以参照 AkkaRpcActor#handleRpcInvocation 的内容。从外表上看,开发者拿到 RpcGateway 的代理对象后,就像操作它们的子接口,例如 JobMasterGateway 一样,调用其接口,例如 registerTaskManager 或 offerSlot 等。而实际的操作经过 AkkaInvocationHandler 的解释变成发到一个 RpcEndpoint 的消息,这个过程与 RMI 是异曲同工的。

RpcServer 的另一个构造点是 RpcServices#startServer,在的实现 AkkaRpcServices 里它会根据传入的 RpcEndpoint,解析这个具体的 RpcEndpoint 子类的对象实现的接口,通过 ActorSystem#actorOf 创建对应的 AkkaRpcActor 并拿到 ActorRef 后同 connect 后阶段一样构造出 RpcServer 的代理对象。这个代理对象由于实现了代理 RpcEndpoint 的方法的逻辑,也即它所实现的 RpcGateway 的逻辑,所以在 RpcEndpoint#getSelfGateway 的时候也可以被强转成对应的 RpcGateway 来返回。

RpcEndpoint 的构造就比较简单了,是直接的调用构造函数的构造,其基础构造函数如下。

protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
        this.rpcService = checkNotNull(rpcService, "rpcService");
        this.endpointId = checkNotNull(endpointId, "endpointId");
        this.rpcServer = rpcService.startServer(this);
        this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}

可以看到就是在这里发起对 RpcService#startServer 的调用。

总的来说,Flink 的 RPC 实现概念混乱,试图实现 Actor Model 但是概念对应上由于其本质上是在 Actor Model 上糊了一层 Actor Model 但是又复用了底层的 ActorSystem 而导致说不清道不明,阻抗失配。同时,在编程上依赖巧合,例如代理同时是 RpcServer 又是 RpcGateway 的子接口,依赖反射,以及下一节中会讲到的同一功能多种暴露手段。可以说是一个勉强能用但是扩展困难,出现问题难以排查而且性能绝非优的实现。

MainThreadExecutable 与线程模型

上一节中提到 Flink 的 RPC 实现出现了同一功能的多种暴露手段,也出现了【迷之线程模型辅助接口 MainThreadExecutable 和 MainThreadExecutor】这样的字眼。这一节就展开的介绍下 Flink RPC 的线程模型。

首先,Flink 的 RPC 实现是基于 Akka 的,所以 Akka 的 Dispatcher 以及上层的 tell 和 ask 的线程模型是一样的。这里主要神奇的是 MainThreadExecutable 这个接口。我们先看到它的定义。

public interface MainThreadExecutable {
  void runAsync(Runnable runnable);
  <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout);
  void scheduleRunAsync(Runnable runnable, long delay);
}

同上,这里有一个 Fenced 的子接口,但是主要与 fencing 相关,不做展开。我们看到这个接口的方法,猜想是我们可以将一个 Runnable 或者 Callable 交给一个此接口的实现去异步地执行。实际情况确实有点像,我们看到它的实现,欸,的实现居然是老朋友,概念糅合的集中点,AkkaInvocationHandler 这个类。真巧。

它的实现除去一些边界条件和检查代码概要如下。

@Override
public void runAsync(Runnable runnable) {
  scheduleRunAsync(runnable, 0L);
}

@Override
public void scheduleRunAsync(Runnable runnable, long delayMillis) {
  if (isLocal) {
    long atTimeNanos = delayMillis ==  ?  : System.nanoTime() + (delayMillis * 1_000_000);
    tell(new RunAsync(runnable, atTimeNanos));
  } else {
    throw new RuntimeException(...);
  }
}

@Override
public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout) {
  if (isLocal) {
    return(CompletableFuture<V>) ask(new CallAsync(callable), callTimeout);
  } else {
    throw new RuntimeException(...);
  }
}

至于 RunAsync 和 CallAsync 的处理逻辑则存在于 AkkaRpcActor 中,简单地说,当 AkkaRpcActor 收到这个消息时,如果是 schedule 且未到时间就会调度到 Dispatcher 线程中等待,否则立即执行,对于 callAsync 也就是 ask,还会通过 tell 返回结果。再结合上面的代码,我们发现,喔,原来这个方法调用只能在 local 的情况下使用,而且进一步看其上层在 RpcEndpoint 处暴露的接口是 protected 的。在实际应用的时候,callAsync 和 scheduleRunAsync 基本没人用,runAsync 则用的不少,如果你熟悉 Akka 的话,你会发现这基本上和 self() ! Msg 没有太大的差别。

那么为什么 Flink 要引入这个接口呢?下面我从 Flink 的 RPC 实现三处和线程模型有关的接口来对比 Flink 的实现,其中有一处就是这里的 runAsync 等。

另一处是 getSelfGateway 方法。我们刚才说,runAsync 基本上就是 self() ! Msg,那么 getSelfGateway 不就是这里的 self() 吗?嗯,确实是的。Flink 的 getSelfGateway 方法主要用于测试的时候测试代码拿到一个 RpcServer 或者你直接管他叫 ActorRef 来进行消息发送或者适配对应的类型签名,另一个主要的作用则是在本地不同的 Actor 之间传递 ActorRef,远端我们有 RpcService#connect 方法来搞定,本地原则上也可以这么搞,不过 getSelfGateway 看起来更方便一点,也减少了 ActorSelection 和后续确认和握手的来回消息传递。那么 runAsync 和这个玩意到底有什么区别呢?答,通过 RMI 的方法进行调用,方法必须拥有姓名,而 runAsync 可以神奇的传递 Runnable 而使得你不需要去改 RpcGateway 的接口就可以给自己发消息。底层实际上就是一个通用的 RunAsync 消息。着实神奇。如果你的 runAsync 的内容是调用一个 RpcGateway 上注册的方法,那么你实际上也可以写成 getSelfGateway 然后通过点语法调用这个方法。

再另一处是 RpcEndpoint#getMainThreadExecutorRpcEndpont#getRpcService.getExecutor() 这两个通常在拼接 CompletableFuture 的 Async 系列方法时作为 Executor 传入。其中前一个实际上就是上面提到的 runAsync 的又一层包装,当你将它作为 Executor 传入的时候,我们看看它的 execute 和 schedule 方法是怎么写的。

private final MainThreadExecutable gateway;

public void runAsync(Runnable runnable) {
  gateway.runAsync(runnable);
}

public void scheduleRunAsync(Runnable runnable, long delayMillis) {
  gateway.scheduleRunAsync(runnable, delayMillis);
}

@Override
public void execute(@Nonnull Runnable command) {
  runAsync(command);
}

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
  final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit);
  FutureTask<Void> ft = new FutureTask<>(command, null);
  scheduleRunAsync(ft, delayMillis);
  return new ScheduledFutureAdapter<>(ft, delayMillis, TimeUnit.MILLISECONDS);
}

所以其实这就是在外面像在里面一样调用 runAsync 的一层接口。

后者则是直接拿到 Akka ActorSystem 中的 Dispatcher 作为 Executor,将动作发布到 ActorSystem 的 Dispatcher 中去调度执行。这里有一点要特别注意的是前面提到的 MainThreadExecutor,它执行时的线程就是 Dispatcher 当中的一个哦。所以如果你把两个竞争的调用,一个放在 MainThreadExecutor 里跑,一个放在 ActorSystem Dispatcher 里跑,那么就有可能会出现死锁,而且这个死锁不是必现的,甚至执行顺序关系都不是引发死锁决定性的因素,首先看你发布到 Dispatcher 的作业是否被分派到 MainThreadExecutor 那个线程上。由于 Flink 把这两个 Executor 直接暴露出来,并且非常方便获得,两者很容易误用,所以相关的并发错误在历史的进程中,尤其是 Dispatcher 和 JobMaster 那一块,综合上其他原因,发生并且被修复过很多次。

Flink 的 RPC 实现把一个简单的 tell 和 ask 的模型,先是搞成 RMI 导致性能堪忧并且实现高度依赖难以理解、维护和扩展的反射,再是暴露出多个功能重复的接口,活生生的把 Java 写成了 Perl 的模样,降低了开发者犯错的难度,增加了开发者犯错的几率。虽然我个人很喜欢 Perl,但是在一个几百万行的大型项目里混进一块 Perl 风格的代码,恐怕还是敬谢不敏。不过另一个角度说,Flink 能把事情搞得这么复杂,但是系统还算能正常的工作,也算 Java 作为一门开发语言的魅力所在。

阅读代码的技巧简述

上面就是我阅读 Spark 和 Flink 的 RPC 实现代码的过程和思考。由于评论性的文字已经内联在阅读的过程中,而且这点篇幅其实还远远没有展开一些有趣的或者关键的技术细节,这里就不再做评述。

回到初的问题,不少同学来问我代码怎么阅读。其实代码本身是人的思维的具象化的一种表现,阅读代码不应该只是去读代码本身,一行一行的看它的执行路径,这样与机器何异?阅读代码首先应该思考的是所要阅读的代码解决了什么问题,这个问题为什么存在,它的现有解法和一般解法是什么。了解到这些基本信息之后,在阅读代码的过程中,对于同质化的部分就略读或者通读过去,对于配置和错误处理和边界情况扫读,重要的边界情况再单独看看。主要精力集中在差异化的部分,对比差异化的部分的考量点,分清孰优孰劣,或者在软件开发的过程中,通常没有一方完全好过另一方,有的只是权衡(trade off)。例如在上面的内容中,好像我把 Flink 说得一无是处,那主要是因为我的工作跟它相关,每天深受这些坑折磨,实际上类型化是一件非常有意义的事,Untyped Akka 和 Spark 当你面对一堆只能靠名字来猜他背后是啥的 Ref 的时候,其实你也是在依赖命名约束或者叫命名巧合来编程。

另外,这里推荐一篇介绍 Spark RPC 的文章,它与本文的不同除了范围以外,主要是更加偏重实践,有作者本人的脚手架仓库可以实验,并且做了时序图和 UML 类图。

图表是非常好的表意手段,写作本文时我原本想引用一个 Actor Model 的概念图,但是一时找不到了。对于 Spark 和 Flink 的 RPC 实现,一张粗略的类责任链和所属关系图也会一图胜千言。不过时序图和 UML 类图恐怕还是太古板和复杂了,就算画得出来,我也高度怀疑到底有谁没事盯着那玩意看。粗略的类图是可以的,UML 类图容易关注点失焦;时序图像我在上面分析线程模型和同步策略的时候,对应的时序关系是要分析的。但是事无巨细的时序图恐怕没有必要,毕竟客户端到服务端大体就是那么回事。如前所述,我们阅读代码的时候,主要是要关注差一点。事无巨细的 UML 类图和时序图太容易把一些琐碎的细节也列上去了。

有实验环境来测试嘛,当然是好的。在阅读 RPC 实现的过程中,虽然我没有把相关的逻辑抽出来做实验,但是测试覆盖率高的项目,其单元测试和可执行的 example 本身就是良好的实验场地。单元测试可能是我见过的的理解一块代码意图的方式之一了。

总的来说,本文展示的是在初步了解一个方向的代码编写常识后,针对某一功能点进行主题阅读和对比整理的过程。在一开始阅读代码的时候,可以先针对某个特定的实现,先把它的逻辑理顺,等到对问题的抽象和解决方案的抽象有一定的感觉之后再进行对比阅读,有的放矢,快速沉淀总结。


分享好友

分享这个小栈给你的朋友们,一起进步吧。

Flink专区
创建时间:2020-06-19 13:29:19
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • ?
    专家
戳我,来吐槽~