上个月参加了一个Spark的培训📒,由于没有大数据基础,基本什么也没听懂。前两周买了《Hadoop权威指南》、《Hive编程指南》📚想认真学一下大数据相关的知识。结果第一个气象数据🌡的例子就卡了好久,今天研究了一下,记录📝一下。
准备数据
由于美国🇺🇸气象中心的数据下载比较慢,下载下来之后还要预处理,比较麻烦。索性自己写✍🏻一个脚本生成一个有格式的最终想要的文件📃,Python脚本如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
|
import random import datetime
start = 1900
end = 1950
file_path = "/Users/chenxii/Documents/BigData/climate-data/"
def generate_temperature(): temp = (random.random() - 0.5) * 100 temp = int(temp * 10) / 10 if temp > 0: return "+" + str(temp) else: return temp
def generate_file1(start_year, end_year, save_path): file_name = 'temp.txt' file = open(save_path + file_name, 'w') for year in range(start_year, end_year): date = datetime.date(int(year), 1, 1) end_date = datetime.date(int(year), 12, 31) while date <= end_date: temp = generate_temperature() file.write(str(date) + " " + str(temp) + '\n') date += datetime.timedelta(days=1) file.flush() file.close()
if __name__ == '__main__': generate_file1(start, end, file_path)
|
保证路径存在,直接用 python 命令运行脚本即可,生成的数据格式为:
每天一条数据,第0-4位是年份,第11位为零上或零下的标识,第12位到结尾为气温值。0的时候没有 ‘+’ 或 ‘-‘ 标识。
用 hdfs dfs -put temp.txt /user/hadoop/climate/temp.txt 将文件上传到 hdfs 中。
Java程序编写
需要新建一个Maven项目,然后打成jar包上传到虚拟机上,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package finqo.hadoop;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(0, 4); int temperature = 0; if ('+' == line.charAt(11) || '-' == line.charAt(11)) { double tmp = Double.parseDouble(line.substring(12)) * 10; temperature = (int) tmp; } context.write(new Text(year), new IntWritable(temperature)); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package finqo.hadoop;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package finqo.hadoop;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool;
import java.io.IOException;
public class MaxTemperature extends Configured implements Tool { public int run(String[] strings) throws Exception { return 0; }
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { if (args.length != 2) { System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); }
Configuration conf = new Configuration(); conf.set("mapred.jar", "flinqo-hadoop-1.0.jar"); Job job = Job.getInstance(conf); job.setJarByClass(MaxTemperature.class); job.setJobName("Max temperature"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
|
mvn package打成jar包后上传,pom文件需要添加:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>flinqo.hadoop</groupId> <artifactId>flinqo-hadoop</artifactId> <version>1.0</version> <packaging>jar</packaging>
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.2</version> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.4.4</version> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
|
不添加的话,jar运行的时候或报错,找不到主类。
运行程序
运行命令:
1
| hadoop jar flinqo-hadoop-1.0.jar /user/hadoop/climate/temp.txt /user/hadoop/result
|
运行结束后查看结果:
1
| hdfs dfs -cat /user/hadoop/result
|
结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| 1900 499 1901 495 1902 499 1903 498 1904 493 1905 499 1906 498 1907 494 1908 499 1909 497 1910 499 1911 497 1912 499 1913 493 1914 498 1915 497 1916 497 1917 499 1918 493 1919 498 1920 499 1921 499 1922 499 1923 497 1924 499 1925 498 1926 499 1927 499 1928 499 1929 498 1930 498 1931 499 1932 497 1933 495 1934 499 1935 498 1936 498 1937 498 1938 496 1939 497 1940 495 1941 499 1942 497 1943 499 1944 499 1945 499 1946 499 1947 499 1948 499 1949 497
|
因为放大了10倍,所以数值比较大。