博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark源码分析之Worker
阅读量:7118 次
发布时间:2019-06-28

本文共 4359 字,大约阅读时间需要 14 分钟。

hot3.png

Spark源码分析之Worker

 支持三种模式的部署:YARN、Standalone以及Mesos。本篇说到的Worker只有在Standalone模式下才有。Worker节点是的工作节点,用于执行提交的作业。我们先从Worker节点的启动开始介绍。

  中Worker的启动有多种方式,但是最终调用的都是org.apache.spark.deploy.worker.Worker类,启动Worker节点的时候可以传很多的参数:内存、核、工作目录等。如果你不知道如何传递,没关系,help一下即可:

01 [wyp@iteblog spark]$ ./bin/spark-class org.apache.spark.deploy.worker.Worker -h
02 Spark assembly has been built with Hive, including Datanucleus jars on classpath
03 Usage: Worker [options] <master>
04  
05 Master must be a URL of the form spark://hostname:port
06  
07 Options:
08   -c CORES, --cores CORES  Number of cores to use
09   -m MEM, --memory MEM     Amount of memory to use (e.g. 1000M, 2G)
10   -d DIR, --work-dir DIR   Directory to run apps in (default: SPARK_HOME/work)
11   -i HOST, --ip IP         Hostname to listen on (deprecated, please use --host or -h)
12   -h HOST, --host HOST     Hostname to listen on
13   -p PORT, --port PORT     Port to listen on (default: random)
14   --webui-port PORT        Port for web UI (default8081)

  从上面的输出我们可以看出Worker的启动支持多达7个参数!这样每个都这样输入岂不是很麻烦?其实,我们不用担心,Worker节点启动地时候将先读取conf/spark-env.sh里面的配置,这些参数配置的解析都是由Worker中的WorkerArguments类进行解析的。如果你没有设置内存,那么将会把Worker启动所在机器的所有内存(会预先留下1G内存给)分给Worker,具体的代码实现如下:

01 def inferDefaultMemory(): Int = {
02     val ibmVendor = System.getProperty("java.vendor").contains("IBM")
03     var totalMb = 0
04     try {
05       val bean = ManagementFactory.getOperatingSystemMXBean()
06       if (ibmVendor) {
07         val beanClass = Class.forName("com.ibm.lang.management.OperatingSystemMXBean")
08         val method = beanClass.getDeclaredMethod("getTotalPhysicalMemory")
09         totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 1024).toInt
10       else {
11         val beanClass = Class.forName("com.sun.management.OperatingSystemMXBean")
12         val method = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize")
13         totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 1024).toInt
14       }
15     catch {
16       case e: Exception => {
17         totalMb = 2*1024
18         System.out.println("Failed to get total physical memory. Using " + totalMb + " MB")
19       }
20     }
21     // Leave out 1 GB for the operating system, but don't return a negative memory size
22     math.max(totalMb - 1024512)
23   }

  同样,如果你没设置cores,那么Spark将会获取你机器的所有可用的核作为参数传进去。解析完参数之后,将运行preStart函数,进行一些启动相关的操作,比如判断是否已经向Master注册过,创建工作目录,启动Worker的WEB UI,向Master进行注册等操作,如下:

01 override def preStart() {
02   assert(!registered)
03   logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
04     host, port, cores, Utils.megabytesToString(memory)))
05   logInfo("Spark home: " + sparkHome)
06   createWorkDir()
07   context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
08   webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
09   webUi.bind()
10   registerWithMaster()
11  
12   metricsSystem.registerSource(workerSource)
13   metricsSystem.start()
14 }

  Worker向Master注册的超时时间为20秒,如果在这20秒内没有成功地向Master注册,那么将会进行重试,重试的次数为3,如过重试的次数大于等于3,那么将无法启动Worker,这时候,你就该看看你的网络环境或者你的Master是否存在问题了。

Worker在运行的过程中将会触发许多的事件, 比如:RegisteredWorker、SendHeartbeat、WorkDirCleanup以及MasterChanged等等,收到不同的事件,Worker进行不同的操作。比如,如果需要运行一个作业,Worker将会启动一个或多个ExecutorRunner,具体的代码可参见receiveWithLogging函数:

01 override def receiveWithLogging = {
02     case RegisteredWorker(masterUrl, masterWebUiUrl) =>
03  
04     case SendHeartbeat =>
05     case WorkDirCleanup =>
06  
07     case MasterChanged(masterUrl, masterWebUiUrl) =>
08  
09     case Heartbeat =>
10      
11     case RegisterWorkerFailed(message) =>
12      
13     case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_=>
14       
15     case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
16        
17     case KillExecutor(masterUrl, appId, execId) =>
18        
19     case LaunchDriver(driverId, driverDesc) => {
20       
21  
22     case KillDriver(driverId) => {
23  
24     case DriverStateChanged(driverId, state, exception) => {
25       
26  
27     case x: DisassociatedEvent if x.remoteAddress == masterAddress =>
28     
29     case RequestWorkerState => {
30   }

  上面的代码是经过处理的,其实receiveWithLogging 方法是从ActorLogReceive继承下来的。

  当Worker节点Stop的时候,将会执行postStop函数,如下:

1 override def postStop() {
2   metricsSystem.report()
3   registrationRetryTimer.foreach(_.cancel())
4   executors.values.foreach(_.kill())
5   drivers.values.foreach(_.kill())
6   webUi.stop()
7   metricsSystem.stop()
8 }

  杀掉所有还未执行完的executors、drivers等,操作。这方法也是从Actor继承下来的。

  本文只是简单地介绍了Worker节点的一些环境,启动等相关的代码,关于它如何和Master通信;如何启动Executor;如何启动Driver都没有涉及,如果你想更好地了解Worker的运行情况,请参见Worker相关的代码吧。

 

转载于:https://my.oschina.net/xiaominmin/blog/1597214

你可能感兴趣的文章
RabbitMQ学习总结(7)——Spring整合RabbitMQ实例
查看>>
模糊查询,多条件查询
查看>>
java JNI 实现原理 (二) Linux 下如何 load JNILibrary
查看>>
内联函数和函数重载
查看>>
<%%>与<%=%>区别
查看>>
Python 中的 if __name__ == '__main__'
查看>>
C++11 并发指南六(atomic 类型详解一 atomic_flag 介绍)
查看>>
kuangbin专题一 简单搜索
查看>>
第二章 单表查询 T-SQL语言基础(2)
查看>>
chrome总是崩溃
查看>>
POJ-1751 Highways---确定部分边的MST
查看>>
在oracle中如何退出edit模式
查看>>
Java类和对象初始化
查看>>
AutoFac文档4(转载)
查看>>
STM8S003 上升沿进不去外部中断问题
查看>>
[LeetCode] Longest Palindromic Substring
查看>>
css
查看>>
BZOJ[2780][Spoj]8093 Sevenk Love Oimaster 后缀数组
查看>>
递推置换,交换次数最少得到升序序列
查看>>
js 倒计时
查看>>