Spark中的顶层RPC环境架构及总结
RpcEndpoint
RpcEndpoint是对能够处理RPC请求,给某一特定服务提供本地调用及跨节点调用的RPC组件的抽象,所有运行于RPC框架之上的实体都应该继承RpcEndpoint,
具体构成比较简单,receive()方法用于处理RpcEndpointRef.send() 和 RpcCallContext.reply()发送的消息,receiveAndReply()则是用于RpcEndpointRef.ask()发送的消息。其他主要是提供了一些待实现的回调函数
- onError()
- onConnected()
- onDisconnected()
- onNetworkError()
- onStart()
- onStop()
无论是receive()还是receiveAndReply()都是返回一个偏函数,在方法体中用模式匹配来将消息用不同的回调函数处理
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
}
RpcEndpoint一般通过匿名内部类和已经实现好的子类来使用,下图是一些子类,其中DummyMaster用于测试。

ThreadSafeRpcEndpoint是继承自RpcEndpoint的特质,主要用于对消息的处理,必须是线程安全的场景。ThreadSafeRpcEndpoint对消息的处理都是串行的,即前一条消息处理完才能接着处理下一条消息,如下图所示,其中TestRpcEndPoint用于测试。

RpcEndpointRef
远程RpcEndpoint引用,用于消息发送方持有并发送消息,如下图所示。

对于特质RpcEndpointRef来说,有以下属性用来保证消息投递的一致性,at-most-once; at-least-once; exactly-once
maxRetries:最大尝试连接次数。可以通过spark.rpc.numRetries参数指定,默认3次retryWaitMs:每次尝试连接最大等待毫秒值。可以通过spark.rpc.retry.wait,默认3秒defaultAskTimeout:RPC ask操作的超时时间。可以通过spark.rpc.askTimeout,默认120秒
RpcEndpointRef中还定义了一些未实现的方法,用于发送消息
send():发送单向异步的消息,发完即忘语义ask():**发送消息到相应的RpcEndpoint.receiveAndReply(), 并返回Future**以在默认超时或者自定义超时内接收返回值askSync():与ask()类似,不过askSync()直接返回接收消息并且是阻塞的方法
RpcEndpointRef 存在唯一实现类 NettyRpcEndpointRef,重写的ask()和send(),首先将message封装为RequestMessage然后再调用NettyRpcEnv的ask()和send()方法。
RpcEnv
表示RPC环境,只有唯一子类NettyRpcEnv。 包含以下重要成员对象
transportContext:TransportContextstreamManager:NettyStreamManagerdispatcher:Dispatcherserver:TransportServerclientFactory: 用于构造发送和接收响应的TransportClient。private val clientFactory = transportContext.createClientFactory(createClientBootstraps())fileDownloadFactory: 用于下载文件的TransportClient。@volatile private var fileDownloadFactory: TransportClientFactory = _outboxes: 远端RPC地址与Outbox的映射关系,使用ConcurrentHashMap保证线程安全
包含以下重要方法
startServer(): 通过TransportContext的createServer()方法创建TransportServer。向Dispatcher注册RpcEndpointVerifier。RpcEndpointVerifier用于校验指定名称的RpcEndpoint是否存在。RpcEndpointVerifier在Dispatcher中的注册名为endpoint-verifierdef startServer(bindAddress: String, port: Int): Unit = { val bootstraps: java.util.List[TransportServerBootstrap] = if (securityManager.isAuthenticationEnabled()) { java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager)) } else { java.util.Collections.emptyList() } server = transportContext.createServer(bindAddress, port, bootstraps) dispatcher.registerRpcEndpoint( RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher)) }ask(): 询问- 如果请求消息的接收者的地址与当前
NettyRpcEnv的地址相同,将消息交给dispatcher的postLocalMessage()方法处理,并传入成功和失败时的回调函数 - 如果请求消息的接收者的地址与当前
NettyRpcEnv的地址不同,则将消息序列化与回调函数一起封装为RpcOutboxMessage放入outbox,将消息投递出去 - 设定定时器,并返回请求结果
private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = { val promise = Promise[Any]() val remoteAddr = message.receiver.address def onFailure(e: Throwable): Unit = { if (!promise.tryFailure(e)) { e match { case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e") case _ => logWarning(s"Ignored failure: $e") } } } def onSuccess(reply: Any): Unit = reply match { case RpcFailure(e) => onFailure(e) case rpcReply => if (!promise.trySuccess(rpcReply)) { logWarning(s"Ignored message: $reply") } } try { if (remoteAddr == address) { val p = Promise[Any]() p.future.onComplete { case Success(response) => onSuccess(response) case Failure(e) => onFailure(e) }(ThreadUtils.sameThread) dispatcher.postLocalMessage(message, p) } else { val rpcMessage = RpcOutboxMessage(message.serialize(this), onFailure, (client, response) => onSuccess(deserialize[Any](client, response))) postToOutbox(message.receiver, rpcMessage) promise.future.failed.foreach { case _: TimeoutException => rpcMessage.onTimeout() case _ => }(ThreadUtils.sameThread) } val timeoutCancelable = timeoutScheduler.schedule(new Runnable { override def run(): Unit = { onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " + s"in ${timeout.duration}")) } }, timeout.duration.toNanos, TimeUnit.NANOSECONDS) promise.future.onComplete { v => timeoutCancelable.cancel(true) }(ThreadUtils.sameThread) } catch { case NonFatal(e) => onFailure(e) } promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread) }- 如果请求消息的接收者的地址与当前
send(): 发送消息。与ask()类似,都是本地消息交于Inbox,远程消息交于outboxprivate[netty] def send(message: RequestMessage): Unit = { val remoteAddr = message.receiver.address if (remoteAddr == address) { // Message to a local RPC endpoint. try { dispatcher.postOneWayMessage(message) } catch { case e: RpcEnvStoppedException => logDebug(e.getMessage) } } else { // Message to a remote RPC endpoint. postToOutbox(message.receiver, OneWayOutboxMessage(message.serialize(this))) } }postToOutbox(): 消息投递到远端结点- 如果
receiver.client不为空,那么消息将直接通过TransportClient发送到远端节点 - 如果
receiver.client为空,则获取远端结点地址对应的Outbox,若没有则新建一个 - 如果
NettyRpcEnv已经停止,移除该Outbox并停止,否则调用Outbox.send()发送消息
private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = { if (receiver.client != null) { message.sendWith(receiver.client) } else { require(receiver.address != null, "Cannot send message to client endpoint with no listen address.") val targetOutbox = { val outbox = outboxes.get(receiver.address) if (outbox == null) { val newOutbox = new Outbox(this, receiver.address) val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox) if (oldOutbox == null) { newOutbox } else { oldOutbox } } else { outbox } } if (stopped.get) { // It's possible that we put `targetOutbox` after stopping. So we need to clean it. outboxes.remove(receiver.address) targetOutbox.stop() } else { targetOutbox.send(message) } } }- 如果
总结
如下图所示,客户端发送请求的流程图,左侧右侧分别表示两个不同节点上的NettyRpcEnv
- 通过调用
NettyRpcEndpointRef的send()和ask()方法向本地节点的RpcEndpoint发送消息。由于是在同一节点,所以直接调用Dispatcher的postLocalMessage()或postOneWayMessage()方法将消息放入EndpointData内部Inbox的messages中。MessageLoop线程最后处理消息,并将消息发给对应的RpcEndpoint处理。 - 通过调用
NettyRpcEndpointRef的send()和ask()方法向远端节点的RpcEndpoint发送消息。消息将首先被封装为OutboxMessage,然后放入到远端RpcEndpoint的地址所对应的Outbox的messages中。 - 每个
Outbox的drainOutbox()方法通过循环,不断从messages列表中取得OutboxMessage,并通过TransportClient发送,底层依赖Netty。 TransportClient和远端NettyRpcEnv的TransportServer建立了连接后,请求消息首先经过Netty管道的处理,由TransportChannelHandler将消息分发给TransportRequestHandler,最终会调用NettyRpcHandler或StreamManager处理。如果是RPC消息则会调用NettyRpcHandler.receive()方法,之后与第一步所述一致,调用Dispatcher的postRemoteMessage()或``postOneWayMessage()`方法。- 如果
TransportRequestHandler处理的是RpcRequest,那么server端的TransportRequestHandler处理消息时还会对client端进行响应,依赖Netty将响应消息发送给client端。client端接收到消息时由TransportChannelHandler将消息分发给TransportResponseHandler处理。


REFERENCE
- spark 源码分析
- Spark内核设计的艺术:架构设计与实现
文档信息
- 本文作者:wzx
- 本文链接:https://wzx140.github.io//2020/09/04/RpcEnv/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)