加载中...
MapReduce二次排序
发表于:2021-12-13 |

二次排序

自定义可比较类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package sort.secondary;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class IntPair implements WritableComparable<IntPair> {

private int first;
private int second;

public IntPair() {
}
public IntPair(int first, int second) {
this.first = first;
this.second = second;
}

@Override
public String toString() {
return "IntPair{" +
"first=" + first +
", second=" + second +
'}';
}

@Override
public int compareTo(IntPair o) {
if (first != o.first) {
return first < o.first ? -1 : 1;
} else if (second != o.second) {
return second < o.second ? -1 : 1;
} else {
return 0;
}
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(first);
out.writeInt(second);
}

@Override
public void readFields(DataInput in) throws IOException {
first = in.readInt();
second = in.readInt();
}
}

Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package sort.secondary;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Map extends
Mapper<LongWritable, Text, IntPair, NullWritable> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] line = value.toString().split(" ");
int left = Integer.parseInt(line[0]);
int right = Integer.parseInt(line[1]);
context.write(new IntPair(left,right),NullWritable.get());
}
}

Reducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package sort.secondary;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class Reduce extends
Reducer<IntPair, NullWritable, IntPair, NullWritable> {
public void reduce(IntPair key, IntWritable values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}

Driver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package sort.secondary;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Driver {
public static void main(String[] args) throws IOException, InterruptedException,ClassNotFoundException {
Configuration conf = new Configuration();
Path inputPath = new Path("file:///D:\\1.txt");
//定义输入目录
Path outPath = new Path("hdfs://localhost:9000/user/secondary/output");
//定义输出路径
FileSystem fileSystem = outPath.getFileSystem(conf);
//创建文件系统
if (fileSystem.exists(outPath)){
//判断目录是否存在,如果存在则删除,确保out目录不存在
fileSystem.delete(outPath, true);
}
Job job = Job.getInstance(conf,"Secondary Sort Practice");
job.setJar("C:\\Users\\SSR\\IdeaProjects\\test\\out\\artifacts\\test_jar\\test.jar");
job.setJarByClass(Driver.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputValueClass(NullWritable.class);
job.setMapOutputKeyClass(IntPair.class);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

input

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
70 55
70 56
70 57
70 58
1 2
3 4
5 6
7 82
203 21
50 512
50 522
50 53
530 54
40 511
20 53
20 522
60 56
60 57
740 58
63 61
730 54
71 55
71 56
73 57
74 58
12 211
31 42
50 62
7 8

Output

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
IntPair{first=1, second=2}
IntPair{first=3, second=4}
IntPair{first=5, second=6}
IntPair{first=7, second=8}
IntPair{first=7, second=82}
IntPair{first=12, second=211}
IntPair{first=20, second=21}
IntPair{first=20, second=53}
IntPair{first=20, second=522}
IntPair{first=31, second=42}
IntPair{first=40, second=511}
IntPair{first=50, second=51}
IntPair{first=50, second=52}
IntPair{first=50, second=53}
IntPair{first=50, second=53}
IntPair{first=50, second=54}
IntPair{first=50, second=62}
IntPair{first=50, second=512}
IntPair{first=50, second=522}
IntPair{first=60, second=51}
IntPair{first=60, second=52}
IntPair{first=60, second=53}
IntPair{first=60, second=56}
IntPair{first=60, second=56}
IntPair{first=60, second=57}
IntPair{first=60, second=57}
IntPair{first=60, second=61}
IntPair{first=63, second=61}
IntPair{first=70, second=54}
IntPair{first=70, second=55}
IntPair{first=70, second=56}
IntPair{first=70, second=57}
IntPair{first=70, second=58}
IntPair{first=70, second=58}
IntPair{first=71, second=55}
IntPair{first=71, second=56}
IntPair{first=73, second=57}
IntPair{first=74, second=58}
IntPair{first=203, second=21}
IntPair{first=530, second=54}
IntPair{first=730, second=54}
IntPair{first=740, second=58}
上一篇:
SQL注入实战笔记
下一篇:
Hive查询进阶
本文目录
本文目录