我将在一个更大的文件中执行以下操作。现在,我有一个具有以下值的示例输入文件。
1000,SMITH,JERRY
1001,JOHN,TIA
1002,TWAIN,MARK
1003,HARDY,DENNIS
1004,CHILD,JACK
1005,CHILD,NORTON
1006,DAVIS,JENNY
1007,DAVIS,KAREN
1008,MIKE,JOHN
1009,DENNIS,SHERIN
现在我正在做的是运行一个mapduce作业来加密每条记录的姓氏并写回输出。我使用mapper分区号作为键,修改后的文本作为值。
所以mapper的输出将是,
0 1000,Mj4oJyk=,,JERRY
0 1001,KzwpPQ,TIA
0 1002,NSQgOi8,MARK
0 1003,KTIzNzg,DENNIS
0 1004,IjsoPyU,JACK
0 1005,IjsoPyU,NORTON
0 1006,JTI3OjI,JENNY
0 1007,JTI3OjI,KAREN
0 1008,LDoqNg,JOHN
0 1009,JTYvPSgg,SHERIN
我不想做任何排序。我也使用减速机,因为,在一个更大的文件的情况下,将有多个映射器,如果没有减速机,将写入多个输出文件。所以我使用单个减少来合并来自所有映射器的值并写入单个文件。现在输入到减速机的值以相反的顺序和映射器的顺序出现。它像下面这样,
1009,JTYvPSgg,SHERIN
1008,LDoqNg==,JOHN
1007,JTI3OjI=,KAREN
1006,JTI3OjI=,JENNY
1005,IjsoPyU=,NORTON
1004,IjsoPyU=,JACK
1003,KTIzNzg=,DENNIS
1002,NSQgOi8=,MARK
1001,KzwpPQ==,TIA
1000,Mj4oJyk=,JERRY
为什么它会颠倒顺序?我如何从mapper保持相同的顺序?任何建议都会有帮助
编辑1:
司机代码是,
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJobName("encrypt");
job.setJarByClass(TestDriver.class);
job.setMapperClass(TestMap.class);
job.setNumReduceTasks(1);
job.setReducerClass(TestReduce.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(hdfsInputPath));
FileOutputFormat.setOutputPath(job, new Path(hdfsOutputPath));
System.exit(job.waitForCompletion(true) ? 0 : 1);
映射器代码是,
inputValues = value.toString().split(",");
stringBuilder = new StringBuilder();
TaskID taskId = context.getTaskAttemptID().getTaskID();
int partition = taskId.getId();
// the mask(inputvalue) method is called to encrypt input values and write to stringbuilder in appropriate format
mask(inputvalues);
context.write(new IntWritable(partition), new Text(stringBuilder.toString()));
减速机代码是,
for(Text value : values) {
context.write(new Text(value), null);
}
MapReduce的基本思想是事情完成的顺序无关紧要。所以你不能(也不需要)控制
您唯一可以控制的是值在还原器中可用的迭代器中放置的顺序。
为此,您可以使用Object键
来维护值的顺序。LongWritable部分(或键)是文件中该行的位置(不是行号,而是从文件开始的位置)。您可以使用该部分来跟踪哪一行是第一行。
然后您的映射器代码将更改为
protected void map(Object key, Text value, Mapper<Object, Text, LongWritable, Text>.Context context)
throws IOException, InterruptedException {
inputValues = value.toString().split(",");
stringBuilder = new StringBuilder();
mask(inputValues);
// the mask(inputvalue) method is called to encrypt input values and write to stringbuilder in appropriate format
context.write(new LongWritable(((LongWritable) key).get()), value);
}
注意:您可以将代码中的所有IntWritable
更改为LongWritable
,但要小心。
inputValues = value.toString().split(",");
stringBuilder = new StringBuilder();
TaskID taskId = context.getTaskAttemptID().getTaskID();
//preserve the number value for sorting
IntWritable idNumber = new IntWritable(Integer.parseInt(inputValue[0])
// the mask(inputvalue) method is called to encrypt input values and write to stringbuilder in appropriate format
mask(inputvalues);
context.write(idNumber, new Text(stringBuilder.toString()));
我做了一些假设,因为你没有映射器的完整代码。由于toString()
输出,我假设inputValue
是一个字符串数组。数组的第一个值应该是您输入的数字值,但是它现在是一个字符串。您必须将数字转换回IntWritable
以匹配您的映射器发出的IntWritable, Text
。hadoop框架将按键排序,并且键的类型为IntWritable
,它将按升序排序。您提供的代码是使用任务ID和阅读APIhttps://hadoop.apache.org/docs/r2.4.1/api/org/apache/hadoop/mapred/TaskAttemptID.html#getTaskID()不清楚这是否会根据您的需要为您的值提供顺序。为了控制输出顺序,我建议使用字符串数组的第一个值并转换为IntWritable。我不知道这是否违反了您屏蔽inputValue
的意图。
编辑
跟进您的评论。您可以简单地将分区
乘以-1
这将导致hadoop框架颠倒顺序。
int partition = -1*taskId.getId();