基站B的用户日志(DDE7970F6捌.log文件),我们经过搭建斯Parker集群总括环境公海赌船网址

运用Scala编写Spark程序求基站下移动用户停留时间长度TopN

CentOS6.4配置Spark-0.9集群

斯Parker是3个快速、通用的持筹握算集群框架,它的基础使用Scala语言编写,它提供了Scala、Java和Python编制程序语言high-level
API,使用那一个API能够非凡轻易地付出并行处理的应用程序。
上面,大家透过搭建斯Parker集群总计环境,并拓展简要地表达,来体验一下应用斯Parker计算的天性。无论从安装运维条件依然从编写处理程序(用Scala,斯Parker暗许提供的Shell环境可以直接输入Scala代码进行多少处理),我们都会感觉比Hadoop
MapReduce总结框架要简单得多,而且,斯Parker能够很好地与HDFS实行相互(从HDFS读取数据,以及写多少到HDFS中)。

设置配备

  • 下载安装配置Scala

1``wgethttp://www.scala-lang.org/files/archive/scala-2.10.3.tgz

2``tar``xvzf scala-2.10.3.tgz

在~/.bashrc中追加环境变量SCALA_HOME,并使之生效:

1``export``SCALA_HOME=/usr/scala/scala-2.10.3

2``export``PATH=$PATH:$SCALA_HOME/bin

  • 下载安装配置斯Parker

咱俩先是在主节点m一上布置斯Parker程序,然后将布署好的次序文件复制分发到集群的次第从结点上。下载解压缩:

1``wgethttp://d3kbcqa49mib13.cloudfront.net/spark-0.9.0-incubating-bin-hadoop1.tgz

2``tar``xvzf spark-0.9.0-incubating-bin-hadoop1.tgz

在~/.bashrc中追加环境变量SPA凯雷德K_HOME,并使之生效:

1``export``SPARK_HOME=/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1

2``export``PATH=$PATH:$SPARK_HOME/bin

在m一上安排斯Parker,修改spark-env.sh配置文件:

1``cd``/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/conf

2``cp``spark-``env``.sh.template spark-``env``.sh

在该脚本文件中,同时将SCALA_HOME配置为Unix环境下实际指向路线,例如:

1``export``SCALA_HOME=/usr/scala/scala-2.10.3

修改conf/slaves文件,将总括节点的主机名增多到该公文,壹行三个,例如:

1``s1

2``s2

3``s3

末尾,将斯Parker的程序文件和布署文件拷贝分发到从节点机器上:

1``scp``-r ~/cloud/programs/spark-0.9.0-incubating-bin-hadoop1 [email protected]:~/cloud/programs/

2``scp``-r ~/cloud/programs/spark-0.9.0-incubating-bin-hadoop1 [email protected]:~/cloud/programs/

3``scp``-r ~/cloud/programs/spark-0.9.0-incubating-bin-hadoop1 [email protected]:~/cloud/programs/

启动Spark集群

大家会接纳HDFS集群上囤积的多寡作为计算的输入,所以首先要把Hadoop集群安装配备好,并打响运行,作者那边运用的是Hadoop
一.二.1版本。运行斯Parker总结集群卓殊轻易,实施如下命令就能够:

1``cd``/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/

2``sbin/start-all.sh

能够见见,在m一上运转了一个名号为Master的进度,在s一上运营了贰个称号为Worker的经过,如下所示,作者那里也运转了Hadoop集群:
主节点m1上:

1``54968 SecondaryNameNode

2``55651 Master

3``55087 JobTracker

4``54814 NameNode

5

6``从节点s1上:

7``33592 Worker

8``33442 TaskTracker

9``33336 DataNode

各类进度是不是运转成功,也得以查阅日志来会诊,例如:

1``主节点上:

2``tail``-100f $SPARK_HOME/logs/spark-shirdrn-org.apache.spark.deploy.master.Master-1-m1.out

3``从节点上:

4``tail``-100f $SPARK_HOME/logs/spark-shirdrn-org.apache.spark.deploy.worker.Worker-1-s1.out

斯Parker集群总计验证

我们利用自个儿的网站的拜会日志文件来演示,示例如下:

1``27.159.254.192 - - [21/Feb/2014:11:40:46 +0800] "GET /archives/526.html HTTP/1.1" 200 12080 "http://shiyanjun.cn/archives/526.html" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"

2``120.43.4.206 - - [21/Feb/2014:10:37:37 +0800] "GET /archives/417.html HTTP/1.1" 200 11464 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"

总结该公文之中IP地址出现频率,来验证斯Parker集群能够符合规律总括。其余,我们必要从HDFS中读取这一个日志文件,然后总结IP地址频率,最终将结果再保存到HDFS中的钦点目录。
首先,须求运转用来交付计算任务的斯Parker Shell:

1``bin/spark-shell

在斯Parker Shell上不得不利用Scala语言写代码来运作。
然后,施行总结IP地址频率,在Spark Shell中奉行如下代码来完结:

1``val``file``=``sc.textFile(``"hdfs://m1:9000/user/shirdrn/wwwlog20140222.log"``)

2``val``result``=``file.flatMap(line``=``> line.split(``"\\s+.*"``)).map(word``=``> (word,``1``)).reduceByKey((a, b)``=``> a + b)

上述的文本hdfs://m一:九千/user/shirdrn/wwwlog二零一五0222.log是输入日志文件。处理进程的日记音信,示例如下所示:

01``14/03/06 21:59:22 INFO MemoryStore: ensureFreeSpace(784) called with curMem=43296, maxMem=311387750

02``14/03/06 21:59:22 INFO MemoryStore: Block broadcast_11 stored as values to memory (estimated size 784.0 B, free 296.9 MB)

03``14/03/06 21:59:22 INFO FileInputFormat: Total input paths to process : 1

04``14/03/06 21:59:22 INFO SparkContext: Starting job: collect at <console>:13

05``14/03/06 21:59:22 INFO DAGScheduler: Registering RDD 84 (reduceByKey at <console>:13)

06``14/03/06 21:59:22 INFO DAGScheduler: Got job 10 (collect at <console>:13) with 1 output partitions (allowLocal=false)

07``14/03/06 21:59:22 INFO DAGScheduler: Final stage: Stage 20 (collect at <console>:13)

08``14/03/06 21:59:22 INFO DAGScheduler: Parents of final stage: List(Stage 21)

09``14/03/06 21:59:22 INFO DAGScheduler: Missing parents: List(Stage 21)

10``14/03/06 21:59:22 INFO DAGScheduler: Submitting Stage 21 (MapPartitionsRDD[84] at reduceByKey at <console>:13), which has no missing parents

11``14/03/06 21:59:22 INFO DAGScheduler: Submitting 1 missing tasks from Stage 21 (MapPartitionsRDD[84] at reduceByKey at <console>:13)

12``14/03/06 21:59:22 INFO TaskSchedulerImpl: Adding task set 21.0 with 1 tasks

13``14/03/06 21:59:22 INFO TaskSetManager: Starting task 21.0:0 as TID 19 on executor localhost: localhost (PROCESS_LOCAL)

14``14/03/06 21:59:22 INFO TaskSetManager: Serialized task 21.0:0 as 1941 bytes in 0 ms

15``14/03/06 21:59:22 INFO Executor: Running task ID 19

16``14/03/06 21:59:22 INFO BlockManager: Found block broadcast_11 locally

17``14/03/06 21:59:22 INFO HadoopRDD: Input split:hdfs://m1:9000/user/shirdrn/wwwlog20140222.log:0+4179514

18``14/03/06 21:59:23 INFO Executor: Serialized size of result for 19 is 738

19``14/03/06 21:59:23 INFO Executor: Sending result for 19 directly to driver

20``14/03/06 21:59:23 INFO TaskSetManager: Finished TID 19 in 211 ms on localhost (progress: 0/1)

21``14/03/06 21:59:23 INFO TaskSchedulerImpl: Remove TaskSet 21.0 from pool

22``14/03/06 21:59:23 INFO DAGScheduler: Completed ShuffleMapTask(21, 0)

23``14/03/06 21:59:23 INFO DAGScheduler: Stage 21 (reduceByKey at <console>:13) finished in 0.211 s

24``14/03/06 21:59:23 INFO DAGScheduler: looking for newly runnable stages

25``14/03/06 21:59:23 INFO DAGScheduler: running: Set()

26``14/03/06 21:59:23 INFO DAGScheduler: waiting: Set(Stage 20)

27``14/03/06 21:59:23 INFO DAGScheduler: failed: Set()

28``14/03/06 21:59:23 INFO DAGScheduler: Missing parents for Stage 20: List()

29``14/03/06 21:59:23 INFO DAGScheduler: Submitting Stage 20 (MapPartitionsRDD[86] at reduceByKey at <console>:13), which is now runnable

30``14/03/06 21:59:23 INFO DAGScheduler: Submitting 1 missing tasks from Stage 20 (MapPartitionsRDD[86] at reduceByKey at <console>:13)

31``14/03/06 21:59:23 INFO TaskSchedulerImpl: Adding task set 20.0 with 1 tasks

32``14/03/06 21:59:23 INFO Executor: Finished task ID 19

33``14/03/06 21:59:23 INFO TaskSetManager: Starting task 20.0:0 as TID 20 on executor localhost: localhost (PROCESS_LOCAL)

34``14/03/06 21:59:23 INFO TaskSetManager: Serialized task 20.0:0 as 1803 bytes in 0 ms

35``14/03/06 21:59:23 INFO Executor: Running task ID 20

36``14/03/06 21:59:23 INFO BlockManager: Found block broadcast_11 locally

37``14/03/06 21:59:23 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1 non-zero-bytes blocks out of 1 blocks

38``14/03/06 21:59:23 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote gets in 1 ms

39``14/03/06 21:59:23 INFO Executor: Serialized size of result for 20 is 19423

40``14/03/06 21:59:23 INFO Executor: Sending result for 20 directly to driver

41``14/03/06 21:59:23 INFO TaskSetManager: Finished TID 20 in 17 ms on localhost (progress: 0/1)

42``14/03/06 21:59:23 INFO TaskSchedulerImpl: Remove TaskSet 20.0 from pool

43``14/03/06 21:59:23 INFO DAGScheduler: Completed ResultTask(20, 0)

44``14/03/06 21:59:23 INFO DAGScheduler: Stage 20 (collect at <console>:13) finished in 0.016 s

45``14/03/06 21:59:23 INFO SparkContext: Job finished: collect at <console>:13, took 0.242136929 s

46``14/03/06 21:59:23 INFO Executor: Finished task ID 20

47``res14: Array[(String, Int)] = Array((27.159.254.192,28), (120.43.9.81,40), (120.43.4.206,16), (120.37.242.176,56), (64.31.25.60,2), (27.153.161.9,32), (202.43.145.163,24), (61.187.102.6,1), (117.26.195.116,12), (27.153.186.194,64), (123.125.71.91,1), (110.85.106.105,64), (110.86.184.182,36), (27.150.247.36,52), (110.86.166.52,60), (175.98.162.2,20), (61.136.166.16,1), (46.105.105.217,1), (27.150.223.49,52), (112.5.252.6,20), (121.205.242.4,76), (183.61.174.211,3), (27.153.230.35,36), (112.111.172.96,40), (112.5.234.157,3), (144.76.95.232,7), (31.204.154.144,28), (123.125.71.22,1), (80.82.64.118,3), (27.153.248.188,160), (112.5.252.187,40), (221.219.105.71,4), (74.82.169.79,19), (117.26.253.195,32), (120.33.244.205,152), (110.86.165.8,84), (117.26.86.172,136), (27.153.233.101,8), (123.12...

能够见到,输出了通过map和reduce计算后的壹些结出。
终极,大家想要将结果保存到HDFS中,只要输入如下代码:

1``result.saveAsTextFile(``"hdfs://m1:9000/user/shirdrn/wwwlog20140222.log.result"``)

查阅HDFS上的结果数据:

1``[[email protected] ~]$ hadoop fs -``cat``/user/shirdrn/wwwlog20140222.log.result/part-00000 |``head``-5

2``(27.159.254.192,28)

3``(120.43.9.81,40)

4``(120.43.4.206,16)

5``(120.37.242.176,56)

6``(64.31.25.60,2)

参照连接

http://www.bkjia.com/Linux/954765.htmlwww.bkjia.comtruehttp://www.bkjia.com/Linux/954765.htmlTechArticleCentOS6.4配置Spark-0.9集群
斯Parker是3个快速、通用的测算集群框架,它的基础使用Scala语言编写,它提供了Scala、Java和Python编制程序语言high-level
AP…

一. 须要:依据手提式有线电话机基站日志计算停留时间长度的TopN

作者们的无绳电话机之所以能够落到实处移动通信,是因为在举国上下外省有大宗的基站,只要手机壹开机,就会和左近的基站尝试创建连接,而每2次建立连接和断开连接都会被记录到活动运营商的基站服务器的日记中。

固然大家不亮堂手提式有线电话机用户所在的具体地方,不过依照基站的位置就能够大致决断手提式有线电话机用户的所处的地理范围,然后公司就足以依照用户的职分消息来做一些推荐介绍广告。

为了便利理解,我们大约模拟了基站上的一部分移动用户日志数据,共陆个字段:手提式有线电电话机号码,时间戳,基站id,连接类型(一代表建立连接,0代表断开连接)

 

基站A的用户日志(19745E一C66.log文书):

18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1
18611132889,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1
18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0
18611132889,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0

 

基站B的用户日志(DDE7970F6八.log文件):

18611132889,20160327075000,9F36407EAD0629FC166F14DDE7970F68,1
18688888888,20160327075100,9F36407EAD0629FC166F14DDE7970F68,1
18611132889,20160327081000,9F36407EAD0629FC166F14DDE7970F68,0
18688888888,20160327081300,9F36407EAD0629FC166F14DDE7970F68,0
18688888888,20160327175000,9F36407EAD0629FC166F14DDE7970F68,1
18611132889,20160327182000,9F36407EAD0629FC166F14DDE7970F68,1
18688888888,20160327220000,9F36407EAD0629FC166F14DDE7970F68,0
18611132889,20160327230000,9F36407EAD0629FC166F14DDE7970F68,0

 

基站C的用户日志(E549D940E0.log文件):

18611132889,20160327081100,CC0710CC94ECC657A8561DE549D940E0,1
18688888888,20160327081200,CC0710CC94ECC657A8561DE549D940E0,1
18688888888,20160327081900,CC0710CC94ECC657A8561DE549D940E0,0
18611132889,20160327082000,CC0710CC94ECC657A8561DE549D940E0,0
18688888888,20160327171000,CC0710CC94ECC657A8561DE549D940E0,1
18688888888,20160327171600,CC0710CC94ECC657A8561DE549D940E0,0
18611132889,20160327180500,CC0710CC94ECC657A8561DE549D940E0,1
18611132889,20160327181500,CC0710CC94ECC657A8561DE549D940E0,0

 

上面是基站表的多少(loc_info.txt文件),共伍个字段,分别表示基站id和经纬度以及信号的辐射类型(时限信号等级,比如二G实信号、3G确定性信号和四G非功率信号等):

9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6
CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6
16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6

 

基于以上数量,供给测算每一种手机号码在每一种基站停留时间最长的3个地点(经纬度)。

思路:

(1)读取日志数据,并切分字段;
(2)整理字段,以手机号码和基站id为key,时间为value封装成Tuple;
(3)根据key进行聚合,将时间累加;
(4)将数据以基站id为key,以手机号码和时间为value封装成Tuple,便于后面和基站表进行join;
(5)读取基站数据,并切分字段;
(6)整理字段,以基站id为key,以基站的经度和纬度为value封装到Tuple;
(7)将两个Tuple进行join;
(8)对join后的结果按照手机号码分组;
(9)将分组后的结果转成List,在按照时间排序,在反转,最后取Top2;
(10)将计算结果写入HDFS;

 

贰. 备选测试数据

公海赌船网址 1

一) 移动用户的日志消息(即下面的二个log文件):

二) 基站数据(即上面的loc_info.txt文件):

 

3. pom文件

  1 <?xml version="1.0" encoding="UTF-8"?>
  2 <project xmlns="http://maven.apache.org/POM/4.0.0"
  3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5     <modelVersion>4.0.0</modelVersion>
  6 
  7     <groupId>com.xuebusi</groupId>
  8     <artifactId>spark</artifactId>
  9     <version>1.0-SNAPSHOT</version>
 10 
 11     <properties>
 12         <maven.compiler.source>1.7</maven.compiler.source>
 13         <maven.compiler.target>1.7</maven.compiler.target>
 14         <encoding>UTF-8</encoding>
 15 
 16         <!-- 这里对jar包版本做集中管理 -->
 17         <scala.version>2.10.6</scala.version>
 18         <spark.version>1.6.2</spark.version>
 19         <hadoop.version>2.6.4</hadoop.version>
 20     </properties>
 21 
 22     <dependencies>
 23         <dependency>
 24             <!-- scala语言核心包 -->
 25             <groupId>org.scala-lang</groupId>
 26             <artifactId>scala-library</artifactId>
 27             <version>${scala.version}</version>
 28         </dependency>
 29         <dependency>
 30             <!-- spark核心包 -->
 31             <groupId>org.apache.spark</groupId>
 32             <artifactId>spark-core_2.10</artifactId>
 33             <version>${spark.version}</version>
 34         </dependency>
 35 
 36         <dependency>
 37             <!-- hadoop的客户端,用于访问HDFS -->
 38             <groupId>org.apache.hadoop</groupId>
 39             <artifactId>hadoop-client</artifactId>
 40             <version>${hadoop.version}</version>
 41         </dependency>
 42     </dependencies>
 43 
 44     <build>
 45         <pluginManagement>
 46 
 47             <plugins>
 48                 <!--  scala-maven-plugin:编译scala程序的Maven插件 -->
 49                 <plugin>
 50                     <groupId>net.alchim31.maven</groupId>
 51                     <artifactId>scala-maven-plugin</artifactId>
 52                     <version>3.2.2</version>
 53                 </plugin>
 54                 <!--  maven-compiler-plugin:编译java程序的Maven插件 -->
 55                 <plugin>
 56                     <groupId>org.apache.maven.plugins</groupId>
 57                     <artifactId>maven-compiler-plugin</artifactId>
 58                     <version>3.5.1</version>
 59                 </plugin>
 60             </plugins>
 61         </pluginManagement>
 62         <plugins>
 63             <!--  编译scala程序的Maven插件的一些配置参数 -->
 64             <plugin>
 65                 <groupId>net.alchim31.maven</groupId>
 66                 <artifactId>scala-maven-plugin</artifactId>
 67                 <executions>
 68                     <execution>
 69                         <id>scala-compile-first</id>
 70                         <phase>process-resources</phase>
 71                         <goals>
 72                             <goal>add-source</goal>
 73                             <goal>compile</goal>
 74                         </goals>
 75                     </execution>
 76                     <execution>
 77                         <id>scala-test-compile</id>
 78                         <phase>process-test-resources</phase>
 79                         <goals>
 80                             <goal>testCompile</goal>
 81                         </goals>
 82                     </execution>
 83                 </executions>
 84             </plugin>
 85             <!--  编译java程序的Maven插件的一些配置参数 -->
 86             <plugin>
 87                 <groupId>org.apache.maven.plugins</groupId>
 88                 <artifactId>maven-compiler-plugin</artifactId>
 89                 <executions>
 90                     <execution>
 91                         <phase>compile</phase>
 92                         <goals>
 93                             <goal>compile</goal>
 94                         </goals>
 95                     </execution>
 96                 </executions>
 97             </plugin>
 98             <!--  maven-shade-plugin:打jar包用的Mavne插件 -->
 99             <plugin>
100                 <groupId>org.apache.maven.plugins</groupId>
101                 <artifactId>maven-shade-plugin</artifactId>
102                 <version>2.4.3</version>
103                 <executions>
104                     <execution>
105                         <phase>package</phase>
106                         <goals>
107                             <goal>shade</goal>
108                         </goals>
109                         <configuration>
110                             <filters>
111                                 <filter>
112                                     <artifact>*:*</artifact>
113                                     <excludes>
114                                         <exclude>META-INF/*.SF</exclude>
115                                         <exclude>META-INF/*.DSA</exclude>
116                                         <exclude>META-INF/*.RSA</exclude>
117                                     </excludes>
118                                 </filter>
119                             </filters>
120                         </configuration>
121                     </execution>
122                 </executions>
123             </plugin>
124         </plugins>
125     </build>
126 
127 </project>

 

4. 编写Scala程序

在src/main/scala/目录下创办多个名称为MobileLocation的Object:

公海赌船网址 2

公海赌船网址, 

MobileLocation.scala完整代码:

package com.xuebusi.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 计算每个基站下停留时间最长的2个手机号
  * Created by SYJ on 2017/1/24.
  */
object MobileLocation {

  def main(args: Array[String]) {
    /**
      * 创建SparkConf
      *
      * 一些说明:
    * 为了便于在IDEA中进行Debug测试,
      * 这里就设置为local模式,即在本地运行Spark程序;
      * 但是这种方式存在一个问题,如果要从HDFS中读数据,
      * 在Windows平台下读取Linux上HDFS中的数据的话,
      * 可能会抛异常,因为它在读取数据的时候要用到Windows
      * 下的一些本地库;
      *
      * 在使用Eclipse在Windows上运行MapReduce程序的时候也会遇到
      * 该问题,但是在Linux和MacOS操作系统中则不会遇到这种问题.
      *
      * Hadoop的压缩和解压缩要用到Windows的一些本地库,
      * 而这些库是C或者C++编写的,而C和C++编写的库文件是不跨平台的,
      * 所以要想在Windows下调试MapReduce程序需要先安装好本地库;
      *
      * 建议在Windows下安装Linux虚拟机,带有图形界面的,
      * 这样调试就不会有问题.
      *
      */
    //本地运行
    val conf: SparkConf = new SparkConf().setAppName("MobileLocation").setMaster("local")
    //创建SparkConf,默认以集群方式运行
    //val conf: SparkConf = new SparkConf().setAppName("MobileLocation")

    //创建SparkContext
    val sc: SparkContext = new SparkContext(conf)

    //从文件系统读取数据
    val lines: RDD[String] = sc.textFile(args(0))

    /**
      * 切分数据
      * 这里使用了两个map方法,不建议使用这种方式,
      * 我们可以在一个map方法中完成
      */
    //lines.map(_.split(",")).map(arr => (arr(0), arr(1).toLong, arr(2), arr(3)))

    //在一个map方法中实现对数据的切分,并组装成元组的形式
    val splited = lines.map(line => {
      val fields: Array[String] = line.split(",")
      val mobile: String = fields(0)
      //val time: Long = fields(1).toLong
      val lac: String = fields(2)
      val tp: String = fields(3)
      val time: Long = if(tp == "1") -fields(1).toLong else fields(1).toLong

      //将字段拼接成元组再返回
      //((手机号码, 基站id), 时间)
      ((mobile, lac), time)
    })

    //分组聚合
    val reduced: RDD[((String, String), Long)] = splited.reduceByKey(_+_)

    //整理成元组格式,便于下一步和基站表进行join
    val lacAndMobieTime = reduced.map(x => {
      //(基站id, (手机号码, 时间))
      (x._1._2, (x._1._1, x._2))
    })

    //读取基站数据
    val lacInfo: RDD[String] = sc.textFile(args(1))

    //切分数据并jion
    val splitedLacInfo = lacInfo.map(line => {
      val fields: Array[String] = line.split(",")
      val id: String = fields(0)//基站id
      val x: String = fields(1)//基站经度
      val y: String = fields(2)//基站纬度
      /**
        * 返回数据
        * 只有key-value类型的数据才可以进行join,
        * 所以这里返回元组,以基站id为key,
        * 以基站的经纬度为value:
        *   (基站id, (经度, 纬度))
        */
      (id, (x, y))
    })

    //join
    //返回:RDD[(基站id, ((手机号码, 时间), (经度, 纬度)))]
    val joined: RDD[(String, ((String, Long), (String, String)))] = lacAndMobieTime.join(splitedLacInfo)
    //ArrayBuffer((CC0710CC94ECC657A8561DE549D940E0,((18688888888,1300),(116.303955,40.041935))), (CC0710CC94ECC657A8561DE549D940E0,((18611132889,1900),(116.303955,40.041935))), (16030401EAFB68F1E3CDF819735E1C66,((18688888888,87600),(116.296302,40.032296))), (16030401EAFB68F1E3CDF819735E1C66,((18611132889,97500),(116.296302,40.032296))), (9F36407EAD0629FC166F14DDE7970F68,((18611132889,54000),(116.304864,40.050645))), (9F36407EAD0629FC166F14DDE7970F68,((18688888888,51200),(116.304864,40.050645))))
    //System.out.println(joined.collect().toBuffer)

    //按手机号码分组
    val groupedByMobile = joined.groupBy(_._2._1._1)
    //ArrayBuffer((18688888888,CompactBuffer((CC0710CC94ECC657A8561DE549D940E0,((18688888888,1300),(116.303955,40.041935))), (16030401EAFB68F1E3CDF819735E1C66,((18688888888,87600),(116.296302,40.032296))), (9F36407EAD0629FC166F14DDE7970F68,((18688888888,51200),(116.304864,40.050645))))), (18611132889,CompactBuffer((CC0710CC94ECC657A8561DE549D940E0,((18611132889,1900),(116.303955,40.041935))), (16030401EAFB68F1E3CDF819735E1C66,((18611132889,97500),(116.296302,40.032296))), (9F36407EAD0629FC166F14DDE7970F68,((18611132889,54000),(116.304864,40.050645))))))
    //System.out.println(groupedByMobile.collect().toBuffer)

    /**
      * 先转成List,再按照时间排序,再反转元素,再取Top2
      */
    val result = groupedByMobile.mapValues(_.toList.sortBy(_._2._1._2).reverse.take(2))
    //ArrayBuffer((18688888888,List((16030401EAFB68F1E3CDF819735E1C66,((18688888888,87600),(116.296302,40.032296))), (9F36407EAD0629FC166F14DDE7970F68,((18688888888,51200),(116.304864,40.050645))))), (18611132889,List((16030401EAFB68F1E3CDF819735E1C66,((18611132889,97500),(116.296302,40.032296))), (9F36407EAD0629FC166F14DDE7970F68,((18611132889,54000),(116.304864,40.050645))))))
    //System.out.println(result.collect().toBuffer)

    //将结果写入到文件系统
    result.saveAsTextFile(args(2))

    //释放资源
    sc.stop()
  }
}

 

伍. 地点测试

编辑配置消息:
公海赌船网址 3

 

点击“+”号,增添一个布署:
公海赌船网址 4

 

选择“Application”:

公海赌船网址 5

 

选择Main方法所在的类:
公海赌船网址 6

 

填写配置的称谓,在Program
arguments输入框中填写三个参数,分别为三个输入目录和3个出口目录:

公海赌船网址 7

 

在地头运转程序:

公海赌船网址 8

 

抛出1个内部存款和储蓄器不足分外:

17/01/24 17:17:58 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: System memory 259522560 must be at least 4.718592E8. Please use a larger heap size.
    at org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:198)
    at org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:180)
    at org.apache.spark.SparkEnv$.create(SparkEnv.scala:354)
    at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
    at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:288)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:457)
    at com.xuebusi.spark.MobileLocation$.main(MobileLocation.scala:37)
    at com.xuebusi.spark.MobileLocation.main(MobileLocation.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

 

能够程序中加多1行代码来缓解:
conf.set(“spark.testing.memory”, “536870912”)//前边的值大于51二m就可以

然则地方的主意不够灵活,那里大家运用给JVM传参的秘籍。修改配置,在VM
options输入框中增多参数“-Xmx51二m”或许“-Dspark.testing.memory=53687091二”,内部存款和储蓄器大小为51二M:

公海赌船网址 9

 

次第输出日志以及结果:
公海赌船网址 10

公海赌船网址 11

 

陆. 交由到斯Parker集群上运营

地面运营和付出到集群上运营,在代码上有所不一致,要求修改1行代码:

公海赌船网址 12

 

将编辑好的次序行使Maven插件打成jar包:
公海赌船网址 13

公海赌船网址 14

将jar包上传到斯Parker集群服务器:

公海赌船网址 15

 

将测试所用的数额也上盛传HDFS集群:

公海赌船网址 16

 

运行命令:

/root/apps/spark/bin/spark-submit \
--master spark://hadoop01:7077,hadoop02:7077 \
--executor-memory 512m \
--total-executor-cores 7 \
--class com.xuebusi.spark.MobileLocation \
    /root/spark-1.0-SNAPSHOT.jar \
    hdfs://hadoop01:9000/mobile/input/mobile_logs \
    hdfs://hadoop01:9000/mobile/input/loc_logs \
    hdfs://hadoop01:9000/mobile/output

 

7. 程序运维时的输出日志

公海赌船网址 17公海赌船网址 18

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/01/24 11:27:48 INFO SparkContext: Running Spark version 1.6.2
17/01/24 11:27:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/01/24 11:27:52 INFO SecurityManager: Changing view acls to: root
17/01/24 11:27:52 INFO SecurityManager: Changing modify acls to: root
17/01/24 11:27:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
17/01/24 11:27:54 INFO Utils: Successfully started service 'sparkDriver' on port 41762.
17/01/24 11:27:55 INFO Slf4jLogger: Slf4jLogger started
17/01/24 11:27:55 INFO Remoting: Starting remoting
17/01/24 11:27:55 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.71.11:49399]
17/01/24 11:27:55 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 49399.
17/01/24 11:27:55 INFO SparkEnv: Registering MapOutputTracker
17/01/24 11:27:55 INFO SparkEnv: Registering BlockManagerMaster
17/01/24 11:27:55 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-6c4988a1-3ad2-49ad-8cc2-02390384792b
17/01/24 11:27:55 INFO MemoryStore: MemoryStore started with capacity 517.4 MB
17/01/24 11:27:56 INFO SparkEnv: Registering OutputCommitCoordinator
17/01/24 11:28:02 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/01/24 11:28:02 INFO SparkUI: Started SparkUI at http://192.168.71.11:4040
17/01/24 11:28:02 INFO HttpFileServer: HTTP File server directory is /tmp/spark-eaa76419-9ddc-43c1-ad0e-cc95c0dede7e/httpd-34193396-8d9c-4ea7-9fca-5caf8c712f86
17/01/24 11:28:02 INFO HttpServer: Starting HTTP Server
17/01/24 11:28:02 INFO Utils: Successfully started service 'HTTP file server' on port 58135.
17/01/24 11:28:06 INFO SparkContext: Added JAR file:/root/spark-1.0-SNAPSHOT.jar at http://192.168.71.11:58135/jars/spark-1.0-SNAPSHOT.jar with timestamp 1485286086076
17/01/24 11:28:06 INFO Executor: Starting executor ID driver on host localhost
17/01/24 11:28:06 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55934.
17/01/24 11:28:06 INFO NettyBlockTransferService: Server created on 55934
17/01/24 11:28:06 INFO BlockManagerMaster: Trying to register BlockManager
17/01/24 11:28:06 INFO BlockManagerMasterEndpoint: Registering block manager localhost:55934 with 517.4 MB RAM, BlockManagerId(driver, localhost, 55934)
17/01/24 11:28:06 INFO BlockManagerMaster: Registered BlockManager
17/01/24 11:28:09 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 153.6 KB, free 153.6 KB)
17/01/24 11:28:09 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 167.5 KB)
17/01/24 11:28:09 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:55934 (size: 13.9 KB, free: 517.4 MB)
17/01/24 11:28:09 INFO SparkContext: Created broadcast 0 from textFile at MobileLocation.scala:40
17/01/24 11:28:12 INFO FileInputFormat: Total input paths to process : 3
17/01/24 11:28:12 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 86.4 KB, free 253.9 KB)
17/01/24 11:28:13 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 19.3 KB, free 273.2 KB)
17/01/24 11:28:13 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:55934 (size: 19.3 KB, free: 517.4 MB)
17/01/24 11:28:13 INFO SparkContext: Created broadcast 1 from textFile at MobileLocation.scala:73
17/01/24 11:28:13 INFO FileInputFormat: Total input paths to process : 1
17/01/24 11:28:13 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
17/01/24 11:28:13 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
17/01/24 11:28:13 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
17/01/24 11:28:13 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
17/01/24 11:28:13 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
17/01/24 11:28:14 INFO SparkContext: Starting job: saveAsTextFile at MobileLocation.scala:110
17/01/24 11:28:14 INFO DAGScheduler: Registering RDD 2 (map at MobileLocation.scala:50)
17/01/24 11:28:14 INFO DAGScheduler: Registering RDD 7 (map at MobileLocation.scala:76)
17/01/24 11:28:14 INFO DAGScheduler: Registering RDD 4 (map at MobileLocation.scala:67)
17/01/24 11:28:14 INFO DAGScheduler: Registering RDD 11 (groupBy at MobileLocation.scala:98)
17/01/24 11:28:14 INFO DAGScheduler: Got job 0 (saveAsTextFile at MobileLocation.scala:110) with 3 output partitions
17/01/24 11:28:14 INFO DAGScheduler: Final stage: ResultStage 4 (saveAsTextFile at MobileLocation.scala:110)
17/01/24 11:28:14 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 3)
17/01/24 11:28:14 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 3)
17/01/24 11:28:14 INFO DAGScheduler: Submitting ShuffleMapStage 1 (MapPartitionsRDD[7] at map at MobileLocation.scala:76), which has no missing parents
17/01/24 11:28:14 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.8 KB, free 277.0 KB)
17/01/24 11:28:14 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.2 KB, free 279.2 KB)
17/01/24 11:28:14 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:55934 (size: 2.2 KB, free: 517.4 MB)
17/01/24 11:28:14 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
17/01/24 11:28:14 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 1 (MapPartitionsRDD[7] at map at MobileLocation.scala:76)
17/01/24 11:28:14 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
17/01/24 11:28:14 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at map at MobileLocation.scala:50), which has no missing parents
17/01/24 11:28:14 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.9 KB, free 283.1 KB)
17/01/24 11:28:14 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.2 KB, free 285.3 KB)
17/01/24 11:28:14 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0, localhost, partition 0,ANY, 2210 bytes)
17/01/24 11:28:14 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:55934 (size: 2.2 KB, free: 517.4 MB)
17/01/24 11:28:14 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006
17/01/24 11:28:14 INFO DAGScheduler: Submitting 3 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at map at MobileLocation.scala:50)
17/01/24 11:28:14 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
17/01/24 11:28:14 INFO Executor: Running task 0.0 in stage 1.0 (TID 0)
17/01/24 11:28:14 INFO Executor: Fetching http://192.168.71.11:58135/jars/spark-1.0-SNAPSHOT.jar with timestamp 1485286086076
17/01/24 11:28:15 INFO Utils: Fetching http://192.168.71.11:58135/jars/spark-1.0-SNAPSHOT.jar to /tmp/spark-eaa76419-9ddc-43c1-ad0e-cc95c0dede7e/userFiles-1790c4be-0a0d-45fd-91f9-c66c49e765a5/fetchFileTemp1508611941727652704.tmp
17/01/24 11:28:19 INFO Executor: Adding file:/tmp/spark-eaa76419-9ddc-43c1-ad0e-cc95c0dede7e/userFiles-1790c4be-0a0d-45fd-91f9-c66c49e765a5/spark-1.0-SNAPSHOT.jar to class loader
17/01/24 11:28:19 INFO HadoopRDD: Input split: hdfs://hadoop01:9000/mobile/input/loc_logs/loc_info.txt:0+171
17/01/24 11:28:20 INFO Executor: Finished task 0.0 in stage 1.0 (TID 0). 2255 bytes result sent to driver
17/01/24 11:28:20 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 1, localhost, partition 0,ANY, 2215 bytes)
17/01/24 11:28:20 INFO Executor: Running task 0.0 in stage 0.0 (TID 1)
17/01/24 11:28:20 INFO HadoopRDD: Input split: hdfs://hadoop01:9000/mobile/input/mobile_logs/19735E1C66.log:0+248
17/01/24 11:28:20 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 0) in 5593 ms on localhost (1/1)
17/01/24 11:28:20 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
17/01/24 11:28:20 INFO DAGScheduler: ShuffleMapStage 1 (map at MobileLocation.scala:76) finished in 5.716 s
17/01/24 11:28:20 INFO DAGScheduler: looking for newly runnable stages
17/01/24 11:28:20 INFO DAGScheduler: running: Set(ShuffleMapStage 0)
17/01/24 11:28:20 INFO DAGScheduler: waiting: Set(ShuffleMapStage 2, ShuffleMapStage 3, ResultStage 4)
17/01/24 11:28:20 INFO DAGScheduler: failed: Set()
17/01/24 11:28:20 INFO Executor: Finished task 0.0 in stage 0.0 (TID 1). 2255 bytes result sent to driver
17/01/24 11:28:20 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 2, localhost, partition 1,ANY, 2215 bytes)
17/01/24 11:28:20 INFO Executor: Running task 1.0 in stage 0.0 (TID 2)
17/01/24 11:28:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 1) in 401 ms on localhost (1/3)
17/01/24 11:28:20 INFO HadoopRDD: Input split: hdfs://hadoop01:9000/mobile/input/mobile_logs/DDE7970F68.log:0+496
17/01/24 11:28:20 INFO Executor: Finished task 1.0 in stage 0.0 (TID 2). 2255 bytes result sent to driver
17/01/24 11:28:20 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 3, localhost, partition 2,ANY, 2215 bytes)
17/01/24 11:28:20 INFO Executor: Running task 2.0 in stage 0.0 (TID 3)
17/01/24 11:28:20 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 2) in 191 ms on localhost (2/3)
17/01/24 11:28:20 INFO HadoopRDD: Input split: hdfs://hadoop01:9000/mobile/input/mobile_logs/E549D940E0.log:0+496
17/01/24 11:28:20 INFO Executor: Finished task 2.0 in stage 0.0 (TID 3). 2255 bytes result sent to driver
17/01/24 11:28:20 INFO DAGScheduler: ShuffleMapStage 0 (map at MobileLocation.scala:50) finished in 6.045 s
17/01/24 11:28:20 INFO DAGScheduler: looking for newly runnable stages
17/01/24 11:28:20 INFO DAGScheduler: running: Set()
17/01/24 11:28:20 INFO DAGScheduler: waiting: Set(ShuffleMapStage 2, ShuffleMapStage 3, ResultStage 4)
17/01/24 11:28:20 INFO DAGScheduler: failed: Set()
17/01/24 11:28:20 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 3) in 176 ms on localhost (3/3)
17/01/24 11:28:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
17/01/24 11:28:20 INFO DAGScheduler: Submitting ShuffleMapStage 2 (MapPartitionsRDD[4] at map at MobileLocation.scala:67), which has no missing parents
17/01/24 11:28:21 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 2.9 KB, free 288.3 KB)
17/01/24 11:28:21 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 1800.0 B, free 290.0 KB)
17/01/24 11:28:21 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:55934 (size: 1800.0 B, free: 517.4 MB)
17/01/24 11:28:21 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1006
17/01/24 11:28:21 INFO DAGScheduler: Submitting 3 missing tasks from ShuffleMapStage 2 (MapPartitionsRDD[4] at map at MobileLocation.scala:67)
17/01/24 11:28:21 INFO TaskSchedulerImpl: Adding task set 2.0 with 3 tasks
17/01/24 11:28:21 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4, localhost, partition 0,NODE_LOCAL, 1947 bytes)
17/01/24 11:28:21 INFO Executor: Running task 0.0 in stage 2.0 (TID 4)
17/01/24 11:28:21 INFO ShuffleBlockFetcherIterator: Getting 3 non-empty blocks out of 3 blocks
17/01/24 11:28:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 17 ms
17/01/24 11:28:21 INFO Executor: Finished task 0.0 in stage 2.0 (TID 4). 1376 bytes result sent to driver
17/01/24 11:28:21 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 5, localhost, partition 1,NODE_LOCAL, 1947 bytes)
17/01/24 11:28:21 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4) in 352 ms on localhost (1/3)
17/01/24 11:28:21 INFO Executor: Running task 1.0 in stage 2.0 (TID 5)
17/01/24 11:28:21 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 3 blocks
17/01/24 11:28:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 6 ms
17/01/24 11:28:21 INFO Executor: Finished task 1.0 in stage 2.0 (TID 5). 1376 bytes result sent to driver
17/01/24 11:28:21 INFO TaskSetManager: Starting task 2.0 in stage 2.0 (TID 6, localhost, partition 2,NODE_LOCAL, 1947 bytes)
17/01/24 11:28:21 INFO Executor: Running task 2.0 in stage 2.0 (TID 6)
17/01/24 11:28:21 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 5) in 190 ms on localhost (2/3)
17/01/24 11:28:21 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 3 blocks
17/01/24 11:28:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
17/01/24 11:28:21 INFO Executor: Finished task 2.0 in stage 2.0 (TID 6). 1376 bytes result sent to driver
17/01/24 11:28:21 INFO DAGScheduler: ShuffleMapStage 2 (map at MobileLocation.scala:67) finished in 0.673 s
17/01/24 11:28:21 INFO DAGScheduler: looking for newly runnable stages
17/01/24 11:28:21 INFO DAGScheduler: running: Set()
17/01/24 11:28:21 INFO DAGScheduler: waiting: Set(ShuffleMapStage 3, ResultStage 4)
17/01/24 11:28:21 INFO DAGScheduler: failed: Set()
17/01/24 11:28:21 INFO TaskSetManager: Finished task 2.0 in stage 2.0 (TID 6) in 162 ms on localhost (3/3)
17/01/24 11:28:21 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
17/01/24 11:28:21 INFO DAGScheduler: Submitting ShuffleMapStage 3 (MapPartitionsRDD[11] at groupBy at MobileLocation.scala:98), which has no missing parents
17/01/24 11:28:21 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 4.1 KB, free 294.2 KB)
17/01/24 11:28:22 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 2.1 KB, free 296.2 KB)
17/01/24 11:28:22 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:55934 (size: 2.1 KB, free: 517.4 MB)
17/01/24 11:28:22 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1006
17/01/24 11:28:22 INFO DAGScheduler: Submitting 3 missing tasks from ShuffleMapStage 3 (MapPartitionsRDD[11] at groupBy at MobileLocation.scala:98)
17/01/24 11:28:22 INFO TaskSchedulerImpl: Adding task set 3.0 with 3 tasks
17/01/24 11:28:22 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 7, localhost, partition 0,PROCESS_LOCAL, 2020 bytes)
17/01/24 11:28:22 INFO Executor: Running task 0.0 in stage 3.0 (TID 7)
17/01/24 11:28:22 INFO ShuffleBlockFetcherIterator: Getting 3 non-empty blocks out of 3 blocks
17/01/24 11:28:22 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
17/01/24 11:28:22 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
17/01/24 11:28:22 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/01/24 11:28:22 INFO Executor: Finished task 0.0 in stage 3.0 (TID 7). 1376 bytes result sent to driver
17/01/24 11:28:22 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID 8, localhost, partition 1,PROCESS_LOCAL, 2020 bytes)
17/01/24 11:28:22 INFO Executor: Running task 1.0 in stage 3.0 (TID 8)
17/01/24 11:28:22 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 7) in 290 ms on localhost (1/3)
17/01/24 11:28:22 INFO ShuffleBlockFetcherIterator: Getting 3 non-empty blocks out of 3 blocks
17/01/24 11:28:22 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 6 ms
17/01/24 11:28:22 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
17/01/24 11:28:22 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
17/01/24 11:28:22 INFO Executor: Finished task 1.0 in stage 3.0 (TID 8). 1376 bytes result sent to driver
17/01/24 11:28:22 INFO TaskSetManager: Starting task 2.0 in stage 3.0 (TID 9, localhost, partition 2,PROCESS_LOCAL, 2020 bytes)
17/01/24 11:28:22 INFO Executor: Running task 2.0 in stage 3.0 (TID 9)
17/01/24 11:28:22 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID 8) in 191 ms on localhost (2/3)
17/01/24 11:28:22 INFO ShuffleBlockFetcherIterator: Getting 3 non-empty blocks out of 3 blocks
17/01/24 11:28:22 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
17/01/24 11:28:22 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
17/01/24 11:28:22 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms
17/01/24 11:28:22 INFO Executor: Finished task 2.0 in stage 3.0 (TID 9). 1376 bytes result sent to driver
17/01/24 11:28:22 INFO DAGScheduler: ShuffleMapStage 3 (groupBy at MobileLocation.scala:98) finished in 0.606 s
17/01/24 11:28:22 INFO DAGScheduler: looking for newly runnable stages
17/01/24 11:28:22 INFO DAGScheduler: running: Set()
17/01/24 11:28:22 INFO DAGScheduler: waiting: Set(ResultStage 4)
17/01/24 11:28:22 INFO DAGScheduler: failed: Set()
17/01/24 11:28:22 INFO TaskSetManager: Finished task 2.0 in stage 3.0 (TID 9) in 172 ms on localhost (3/3)
17/01/24 11:28:22 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 
17/01/24 11:28:22 INFO DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[14] at saveAsTextFile at MobileLocation.scala:110), which has no missing parents
17/01/24 11:28:23 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 66.4 KB, free 362.6 KB)
17/01/24 11:28:23 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 23.0 KB, free 385.6 KB)
17/01/24 11:28:24 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:55934 (size: 23.0 KB, free: 517.3 MB)
17/01/24 11:28:24 INFO SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:1006
17/01/24 11:28:24 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 4 (MapPartitionsRDD[14] at saveAsTextFile at MobileLocation.scala:110)
17/01/24 11:28:24 INFO TaskSchedulerImpl: Adding task set 4.0 with 3 tasks
17/01/24 11:28:24 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 10, localhost, partition 0,NODE_LOCAL, 1958 bytes)
17/01/24 11:28:24 INFO Executor: Running task 0.0 in stage 4.0 (TID 10)
17/01/24 11:28:24 INFO ShuffleBlockFetcherIterator: Getting 3 non-empty blocks out of 3 blocks
17/01/24 11:28:24 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms
17/01/24 11:28:26 INFO FileOutputCommitter: Saved output of task 'attempt_201701241128_0004_m_000000_10' to hdfs://hadoop01:9000/mobile/output/_temporary/0/task_201701241128_0004_m_000000
17/01/24 11:28:26 INFO SparkHadoopMapRedUtil: attempt_201701241128_0004_m_000000_10: Committed
17/01/24 11:28:26 INFO Executor: Finished task 0.0 in stage 4.0 (TID 10). 2080 bytes result sent to driver
17/01/24 11:28:26 INFO TaskSetManager: Starting task 1.0 in stage 4.0 (TID 11, localhost, partition 1,NODE_LOCAL, 1958 bytes)
17/01/24 11:28:26 INFO Executor: Running task 1.0 in stage 4.0 (TID 11)
17/01/24 11:28:26 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 10) in 1823 ms on localhost (1/3)
17/01/24 11:28:26 INFO ShuffleBlockFetcherIterator: Getting 3 non-empty blocks out of 3 blocks
17/01/24 11:28:26 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
17/01/24 11:28:27 INFO BlockManagerInfo: Removed broadcast_2_piece0 on localhost:55934 in memory (size: 2.2 KB, free: 517.3 MB)
17/01/24 11:28:27 INFO BlockManagerInfo: Removed broadcast_5_piece0 on localhost:55934 in memory (size: 2.1 KB, free: 517.3 MB)
17/01/24 11:28:27 INFO BlockManagerInfo: Removed broadcast_4_piece0 on localhost:55934 in memory (size: 1800.0 B, free: 517.3 MB)
17/01/24 11:28:27 INFO BlockManagerInfo: Removed broadcast_3_piece0 on localhost:55934 in memory (size: 2.2 KB, free: 517.4 MB)
17/01/24 11:28:27 INFO FileOutputCommitter: Saved output of task 'attempt_201701241128_0004_m_000001_11' to hdfs://hadoop01:9000/mobile/output/_temporary/0/task_201701241128_0004_m_000001
17/01/24 11:28:27 INFO SparkHadoopMapRedUtil: attempt_201701241128_0004_m_000001_11: Committed
17/01/24 11:28:27 INFO Executor: Finished task 1.0 in stage 4.0 (TID 11). 2080 bytes result sent to driver
17/01/24 11:28:27 INFO TaskSetManager: Starting task 2.0 in stage 4.0 (TID 12, localhost, partition 2,NODE_LOCAL, 1958 bytes)
17/01/24 11:28:27 INFO Executor: Running task 2.0 in stage 4.0 (TID 12)
17/01/24 11:28:27 INFO TaskSetManager: Finished task 1.0 in stage 4.0 (TID 11) in 1675 ms on localhost (2/3)
17/01/24 11:28:27 INFO ShuffleBlockFetcherIterator: Getting 3 non-empty blocks out of 3 blocks
17/01/24 11:28:27 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
17/01/24 11:28:28 INFO FileOutputCommitter: Saved output of task 'attempt_201701241128_0004_m_000002_12' to hdfs://hadoop01:9000/mobile/output/_temporary/0/task_201701241128_0004_m_000002
17/01/24 11:28:28 INFO SparkHadoopMapRedUtil: attempt_201701241128_0004_m_000002_12: Committed
17/01/24 11:28:28 INFO Executor: Finished task 2.0 in stage 4.0 (TID 12). 2080 bytes result sent to driver
17/01/24 11:28:28 INFO DAGScheduler: ResultStage 4 (saveAsTextFile at MobileLocation.scala:110) finished in 3.750 s
17/01/24 11:28:28 INFO TaskSetManager: Finished task 2.0 in stage 4.0 (TID 12) in 288 ms on localhost (3/3)
17/01/24 11:28:28 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 
17/01/24 11:28:28 INFO DAGScheduler: Job 0 finished: saveAsTextFile at MobileLocation.scala:110, took 14.029200 s
17/01/24 11:28:28 INFO SparkUI: Stopped Spark web UI at http://192.168.71.11:4040
17/01/24 11:28:28 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/01/24 11:28:28 INFO MemoryStore: MemoryStore cleared
17/01/24 11:28:28 INFO BlockManager: BlockManager stopped
17/01/24 11:28:28 INFO BlockManagerMaster: BlockManagerMaster stopped
17/01/24 11:28:28 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/01/24 11:28:28 INFO SparkContext: Successfully stopped SparkContext
17/01/24 11:28:28 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
17/01/24 11:28:28 INFO ShutdownHookManager: Shutdown hook called
17/01/24 11:28:28 INFO ShutdownHookManager: Deleting directory /tmp/spark-eaa76419-9ddc-43c1-ad0e-cc95c0dede7e
17/01/24 11:28:28 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
17/01/24 11:28:28 INFO ShutdownHookManager: Deleting directory /tmp/spark-eaa76419-9ddc-43c1-ad0e-cc95c0dede7e/httpd-34193396-8d9c-4ea7-9fca-5caf8c712f86
[root@hadoop01 ~]# 
[root@hadoop01 ~]# 
[root@hadoop01 ~]# 

View Code

 

8. 查看输出结果

[root@hadoop01 ~]# hdfs dfs -cat /mobile/output/part-00000
[root@hadoop01 ~]# hdfs dfs -cat /mobile/output/part-00001
(18688888888,List((16030401EAFB68F1E3CDF819735E1C66,((18688888888,87600),(116.296302,40.032296))), (9F36407EAD0629FC166F14DDE7970F68,((18688888888,51200),(116.304864,40.050645)))))
(18611132889,List((16030401EAFB68F1E3CDF819735E1C66,((18611132889,97500),(116.296302,40.032296))), (9F36407EAD0629FC166F14DDE7970F68,((18611132889,54000),(116.304864,40.050645)))))
[root@hadoop01 ~]# hdfs dfs -cat /mobile/output/part-00002
[root@hadoop01 ~]#

 

相关文章