java中整体MR工作机制是怎样的

本篇内容主要讲解“java中整体MR工作机制是怎样的”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“java中整体MR工作机制是怎样的”吧!

公司专注于为企业提供成都做网站、网站设计、微信公众号开发、电子商务商城网站建设小程序定制开发,软件按需求定制网站等一站式互联网企业服务。凭借多年丰富的经验,我们会仔细了解各客户的需求而做出多方面的分析、设计、整合,为客户设计出具风格及创意性的商业解决方案,创新互联公司更提供一系列网站制作和网站推广的服务。

1、整体MR工作机制源码解读(job提交流程)

1.1、job提交流程

--以wordCount案例为例,进行断点调试
1、在WordCountDriver类中的job.waitForCompletion(true);处打上断点(入口),以debug模式运行
	a.在Configuration conf = new Configuration();conf中做的操作是读取所有相关的配置文件
	b.并将job对象创建出来,通过--Job job = Job.getInstance(conf);完成

java中整体MR工作机制是怎样的

2、进入waitForCompletion()方法
if (state == JobState.DEFINE) {       // --确定job的当前状态,如果是state,则进行提交
      submit();
}
3、waitForCompletion()方法中的参数boolean verbose ~ verbose:true(默认值)
if (verbose) {
      monitorAndPrintJob();		--对当前的job进行监控,并打印job的信息
} 
4、进入submit()方法	~位置为Job.java~1562行
    --ensureState(JobState.DEFINE); 再次确认Job的状态
    --setUseNewAPI(); 设置使用新的API			--hadoop中提供了2套API
    --connect(); 明确当前提交的Job运行的环境是本地还是集群
4.1、进入connect()方法	--Job.java~1534行
    --cluster理解为当前job运行所需的一个环境对象,开始cluster为null,通过匿名内部类进行对象的创建
4.2	进入return new Cluster(getConfiguration())方法   	--Job.java~1540行
4.3 进入Cluster.java类,查看Cluster的有参构造			--Cluster.java~105行
4.4 进入initialize(jobTrackAddr, conf);方法,定位到initProviderList();//获取Job运行的环境列表
4.5 进入initProviderList()方法 	//获取job运行的环境列表		--Cluster.java~75行
4.5 查看Cluster.java类中的124行,查看遍历providerList有2种运行环境
	YarnClientProtocolProvider ==>集群环境
	LocalClientProtocolProvider==>本地环境
4.6 进入Cluster.java类130行,clientProtocol = provider.create(conf)方法,进入可以看到
4.7 YarnClientProtocolProvider.class 类19行

java中整体MR工作机制是怎样的

4.7 clientProtocol = null,继续向下走,可以看到下面的操作是对当前运行环境的判断
  	根据Provider结合当前的conf判断是哪个环境
	YarnClientProtocolProvider ==>  YarnRunner 	--yarn的运行对象
	LocalClientProtocolProvider==>  LocalJobRunner	--本地的运行对象
	
5、connect()执行完毕,继续向下执行,看Job.java 1565行,   //构造job的提交器对象
	 final JobSubmitter submitter = this.getJobSubmitter(this.cluster.getFileSystem(), 
	 this.cluster.getClient());	--1565行,使用的是当前构造器的文件系统对象及客户端对象
	 
6、继续向下走,到Job.java的1570行,该行代码才是真正进行job的提交
return submitter.submitJobInternal(Job.this, cluster); 通过JobSubmitter提交Job

7、job的状态转变为可执行,this.state = Job.JobState.RUNNING;   --Job.java类的1573行

8、从1570行打断点进入方法,进入JobSubmitter.java类中,定位到该类139行,submitJobInternal()方法,
向下走断点

9、定位到 checkSpecs(job); 方法,用于校验输出路径进入该方法

java中整体MR工作机制是怎样的

10、进入到output.checkOutputSpeces(job),查看源码   --进入到FileOutPutFormat.java类中,定位到151行。
由此可以得到的一个结果是:输出路径的校验是在job提交之前完成的

java中整体MR工作机制是怎样的

11、跳出checkSpecs(job);方法,继续向下走		--JobSub
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//获取Job临时工作目录   --D:/tmp/hadoop/mapred/staging/Administrator1590679188/.staging

12、继续向下走,定位到代码157行,submitClient.getNewJobId();  //获取提交的job的jobId
 	JobID jobId = submitClient.getNewJobID();	
//jobId=job_local11590679188_001本地模式下,我们知道每个job都有对应一个jobId,不管程序在本地还是yarn
 	
13、Path submitJobDir = new Path(jobStagingArea, jobId.toString()); //生成Job提交路径
--D:/tmp/hadoop/mapred/staging/Administrator1590679188/.staging/job_local11590679188_001 job

14、copyAndConfigureFiles(job, submitJobDir);
//拷贝Job相关的配置信息,并将job的提交路径在磁盘中创建出来

java中整体MR工作机制是怎样的

15、进入uploadResourcesInternal(job,submitJobDir);方法 从JobSubmitter类99行进入

java中整体MR工作机制是怎样的

16、进入uploadResourcesInternal(job,submitJobDir)方法,读取配置项

java中整体MR工作机制是怎样的 java中整体MR工作机制是怎样的

17、进入writeSplits(job,submitJobDir);方法 writeSplits(job, submitJobDir); //生成切片信息

java中整体MR工作机制是怎样的

18、定位到 maps = writeNewSplits(job, jobSubmitDir); ,进入该方法 //生成切片进入

java中整体MR工作机制是怎样的

切片对象splits内容为: file:///D:/input/inputWord/JaneEyre.txt:0+36306679 (文件,读取位置从0到36306679) 切片是逻辑上的说法,记录的就是读取文件从什么位置到什么位置

19、切片对象splits中记录的内容是:读取的是那个文件,从文件的0位置开始读取到那个位置

java中整体MR工作机制是怎样的

20、return array.length; //返回切片的个数回到200行位置,将writeSplits(job, submitJobDir)返回的数值赋给maps

21、conf.setInt(MRJobConfig.NUM_MAPS, maps); //根据切片的个数设置启动多少个MapTask 并最终在job的提交路径中有两个文件:

--job.split 切片具体信息

--job.splitmetainfo 切片描述信息

java中整体MR工作机制是怎样的

22、writeConf(conf, submitJobFile); //把job的所有配置信息写到job的提交路径下 最终在job的提交路径下生成一个文件:job.xml。该文件记录所有的xml配置信息(包括自己设置的)

java中整体MR工作机制是怎样的

23、根据切片信息(确定启动mapTask的个数)和配置信息,真正开始执行job的任务

24、status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); // 真正将job提交进行执行

java中整体MR工作机制是怎样的

到此,相信大家对“java中整体MR工作机制是怎样的”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


标题名称:java中整体MR工作机制是怎样的
网站URL:http://ybzwz.com/article/gsjoce.html