Spark RPC中的底层的流数据处理与消息传输
StreamManager
在流中获取单个块,这在TransportRequestHandler中用于响应fetchChunk()请求。有两个子类OneForOneStreamManager和NettyStreamManager
OneForOneStreamManager
为NettyBlockRpcServer提供了一对一的流服务。ManagedBuffer是一个不可变的byte数组的抽象。
内部类StreamState维护了单个流的状态,如下代码所示
appId:请求流的应用程序idbuffers:可迭代的ManagedBuffer,表示当前流的缓冲数据associatedChannel: 与当前流关联的channelchunksBeingTransferred: 正在传输的ManagedBuffer数量curChunk: 客户端当前接收到的ManagedBuffer索引,为了确认调用方按顺序且一次只请求一个chunk
private static class StreamState {
final String appId;
final Iterator<ManagedBuffer> buffers;
final Channel associatedChannel;
int curChunk = 0;
volatile long chunksBeingTransferred = 0L;
StreamState(String appId, Iterator<ManagedBuffer> buffers, Channel channel) {
this.appId = appId;
this.buffers = Preconditions.checkNotNull(buffers);
this.associatedChannel = channel;
}
}
OneForOneStreamManager有以下重要的成员属性
nextStreamId:下一个stream的id,AtomicLong保证了并发安全。streams: 维护了stream id和StreamState之间的映射关系。ConcurrentHashMap<Long, StreamState>保证了线程安全。
以下为重要的方法
registerStream(): 注册一个ManagedBuffers流和channel。public long registerStream(String appId, Iterator<ManagedBuffer> buffers, Channel channel) { long myStreamId = nextStreamId.getAndIncrement(); streams.put(myStreamId, new StreamState(appId, buffers, channel)); return myStreamId; }getChunk(): 获取被封装为ManagedBuffer的单独块。如果当前流已经到达末尾,就移除这个流。public ManagedBuffer getChunk(long streamId, int chunkIndex) { StreamState state = streams.get(streamId); if (chunkIndex != state.curChunk) { throw new IllegalStateException(String.format( "Received out-of-order chunk index %s (expected %s)", chunkIndex, state.curChunk)); } else if (!state.buffers.hasNext()) { throw new IllegalStateException(String.format( "Requested chunk index beyond end %s", chunkIndex)); } state.curChunk += 1; ManagedBuffer nextChunk = state.buffers.next(); if (!state.buffers.hasNext()) { logger.trace("Removing stream id {}", streamId); streams.remove(streamId); } return nextChunk; }
NettyStreamManager
为NettyRpcEnv提供文件流服务。提供对普通文件,jar包和目录的下载和添加缓存的功能。TransportRequestHandler的StreamRequest消息的处理依赖于NettyStreamManager,各个Executor节点就可以使用Driver节点的RpcEnv提供的``NettyStreamManager`,从Driver将Jar包或文件下载到Executor节点上供任务执行。
RpcHandler
处理TransportRequestHandler中的请求消息。下面主要看其实现类NettyRpcHandler
internalReceive(): 将ByteBuffer封装为RequestMessage类型。由TransportClient获取远端地址,在构造RequestMessage时对ByteBuffer进行了反序列化,若没有发送者的地址,则使用之前TransprtClient获取到的地址。若有发送者的地址,则在Inbox中添加RemoteProcessConnected消息private def internalReceive(client: TransportClient, message: ByteBuffer): RequestMessage = { val addr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress] assert(addr != null) val clientAddr = RpcAddress(addr.getHostString, addr.getPort) val requestMessage = RequestMessage(nettyEnv, client, message) if (requestMessage.senderAddress == null) { // Create a new message with the socket address of the client as the sender. new RequestMessage(clientAddr, requestMessage.receiver, requestMessage.content) } else { // The remote RpcEnv listens to some port, we should also fire a RemoteProcessConnected for // the listening address val remoteEnvAddress = requestMessage.senderAddress if (remoteAddresses.putIfAbsent(clientAddr, remoteEnvAddress) == null) { dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress)) } requestMessage } } private[netty] object RequestMessage { private def readRpcAddress(in: DataInputStream): RpcAddress = { val hasRpcAddress = in.readBoolean() if (hasRpcAddress) { RpcAddress(in.readUTF(), in.readInt()) } else { null } } def apply(nettyEnv: NettyRpcEnv, client: TransportClient, bytes: ByteBuffer): RequestMessage = { val bis = new ByteBufferInputStream(bytes) val in = new DataInputStream(bis) try { val senderAddress = readRpcAddress(in) val endpointAddress = RpcEndpointAddress(readRpcAddress(in), in.readUTF()) val ref = new NettyRpcEndpointRef(nettyEnv.conf, endpointAddress, nettyEnv) ref.client = client new RequestMessage( senderAddress, ref, // The remaining bytes in `bytes` are the message content. nettyEnv.deserialize(client, bytes)) } finally { in.close() } } } // NettyRpcEnv.deserialize() private[netty] def deserialize[T: ClassTag](client: TransportClient, bytes: ByteBuffer): T = { NettyRpcEnv.currentClient.withValue(client) { deserialize { () => javaSerializerInstance.deserialize[T](bytes) } } }receive(): 处理一条TransportClient发送的 RPC 消息。底层是将消息交由Dispatcher去处理,将消息放入Inbox里。// RpcEndpoint.receiveAndReply() override def receive( client: TransportClient, message: ByteBuffer, callback: RpcResponseCallback): Unit = { val messageToDispatch = internalReceive(client, message) dispatcher.postRemoteMessage(messageToDispatch, callback) } // RpcEndpoint.receive() override def receive( client: TransportClient, message: ByteBuffer): Unit = { val messageToDispatch = internalReceive(client, message) dispatcher.postOneWayMessage(messageToDispatch) }getStreamManager(): 获取StreamManager,由上一小节所述可以获取单个块channelActive:向Inbox投递RemoteProcessConnected消 息channelInactive:向Inbox投递RemoteProcessDisconnected消 息exceptionCaught(): 向Inbox投递RemoteProcessConnectionError消 息
REFERENCE
- spark 源码分析
- Spark内核设计的艺术:架构设计与实现
文档信息
- 本文作者:wzx
- 本文链接:https://wzx140.github.io//2020/09/01/RpcStreaming/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)