数据来源于:NCDC 美国国家气候数据中心
这里是五条源数据:
0029029070999991901010813004+64333+023450FM-12+000599999V0202301N011819999999N0000001N9-00331+99999103201ADDGF108991999999999999999999 0035029070999991901010820004+64333+023450FM-12+000599999V0202301N013919999999N0000001N9-00331+99999102991ADDGF108991999999999999999999MW1701 0029029070999991901010906004+64333+023450FM-12+000599999V0209991C000019999999N0000001N9-00501+99999102871ADDGF108991999999999999999999 0029029070999991901010913004+64333+023450FM-12+000599999V0209991C000019999999N0000001N9-00331+99999102661ADDGF108991999999999999999999 0029029070999991901010920004+64333+023450FM-12+000599999V0201801N009819999999N0000001N9-00281+99999102391ADDGF108991999999999999999999
对数据格式进行解释:
位置 | 数据 | 含义 |
---|---|---|
1-4 | 0029 | |
5-10 | 029070 | USAF weather station identifie |
11-15 | 99999 | WBAN weather station identifier |
16-23 | 19010108 | 观察日期 |
24-27 | 1300 | 观察时间 |
28 | 4 | |
29-34 | +64333 | 纬度(1000倍) |
35-41 | +023450 | 经度(1000倍) |
42-46 | FM-12 | |
47-51 | +0005 | 海拔 |
52-56 | 99999 | |
57-60 | V020 | |
61-63 | 230 | 风向 |
64 | 1 | 质量代码 |
65 | N | |
66-69 | 0118 | |
70 | 1 | 质量代码 |
71-75 | 99999 | 云高(米) |
76 | 9 | |
77 | 9 | |
78 | N | |
79-84 | 000000 | 能见距离(米) |
85 | 1 | 质量代码 |
86 | N | |
87 | 9 | |
88-92 | -0033 | 空气温度(摄氏度*10) |
93 | 1 | 质量代码 |
94-98 | +9999 | 露点温度(摄氏度*10) |
99 | 9 | 质量代码 |
100-104 | 10320 | 大气压(hectopascals x10) |
105 | 1 | 质量代码 |
我的代码是和书上一样的。依旧用的是三个class的代码框架,不多说直接上代码
新建一个MapReduce项目
先写job的代码
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MTJob { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } @SuppressWarnings("deprecation") Job job = new Job(); job.setJarByClass(MTJob.class); job.setJobName("Max temperature"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MTMapper.class); job.setReducerClass(MTReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
然后就是mapper
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MTMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } }
最后是reducer
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MTReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } }
然后导出jar包

查看自己需要计算的气温的文件,这里是1901,1902年的

然后就可以计算了直接运行jar包


出现上面这些信息则代表成功了
接下来查看输出结果

就可以看到自己需要结果了
年份 | 最高气温 |
---|---|
1901 | 317 |
1902 | 244 |