副标题[/!--empirenews.page--]
技术沙龙 | 邀您于8月25日与国美/AWS/转转三位专家共同探讨小程序电商实战
前言
这两年做 streamingpro 时,不可避免的需要对Spark做大量的增强。就如同我之前吐槽的,Spark大量使用了new进行对象的创建,导致里面的实现基本没有办法进行替换。

比如SparkEnv里有个属性叫closureSerializer,是专门做任务的序列化反序列化的,当然也负责对函数闭包的序列化反序列化。我们看看内部是怎么实现的:
- val serializer = instantiateClassFromConf[Serializer](
- "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
- logDebug(s"Using serializer: ${serializer.getClass}")
-
- val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
-
- val closureSerializer = new JavaSerializer(conf)
-
- val envInstance = new SparkEnv(
- .....
- closureSerializer, ....
这里直接new了一个JavaSerializer,并不能做配置。如果不改源码,你没有任何办法可以替换掉掉这个实现。同理,如果我想替换掉Executor的实现,基本也是不可能的。
今年有两个大地方涉及到了对Spark的【魔改】,也就是不通过改源码,使用原有发型包,通过添加新代码的方式来对Spark进行增强。
二层RPC的支持
我们知道,在Spark里,我们只能通过Task才能touch到Executor。现有的API你是没办法直接操作到所有或者指定部分的Executor。比如,我希望所有Executor都加载一个资源文件,现在是没办法做到的。为了能够对Executor进行直接的操作,那就需要建立一个新的通讯层。那具体怎么做呢?
首先,在Driver端建立一个Backend,这个比较简单,
- class PSDriverBackend(sc: SparkContext) extends Logging {
-
- val conf = sc.conf
- var psDriverRpcEndpointRef: RpcEndpointRef = null
-
- def createRpcEnv = {
- val isDriver = sc.env.executorId == SparkContext.DRIVER_IDENTIFIER
- val bindAddress = sc.conf.get(DRIVER_BIND_ADDRESS)
- val advertiseAddress = sc.conf.get(DRIVER_HOST_ADDRESS)
- var port = sc.conf.getOption("spark.ps.driver.port").getOrElse("7777").toInt
- val ioEncryptionKey = if (sc.conf.get(IO_ENCRYPTION_ENABLED)) {
- Some(CryptoStreamUtils.createKey(sc.conf))
- } else {
- None
- }
- logInfo(s"setup ps driver rpc env: ${bindAddress}:${port} clientMode=${!isDriver}")
- var createSucess = false
- var count = 0
- val env = new AtomicReference[RpcEnv]()
- while (!createSucess && count < 10) {
- try {
- env.set(RpcEnv.create("PSDriverEndpoint", bindAddress, port, sc.conf,
- sc.env.securityManager, clientMode = !isDriver))
- createSucess = true
- } catch {
- case e: Exception =>
- logInfo("fail to create rpcenv", e)
- count += 1
- port += 1
- }
- }
- if (env.get() == null) {
- logError(s"fail to create rpcenv finally with attemp ${count} ")
- }
- env.get()
- }
-
- def start() = {
- val env = createRpcEnv
- val pSDriverBackend = new PSDriverEndpoint(sc, env)
- psDriverRpcEndpointRef = env.setupEndpoint("ps-driver-endpoint", pSDriverBackend)
- }
-
- }
(编辑:帝国网站管理系统)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|