提问者:小点点

Hadoop Mapduce:减速器的值以相反的顺序排列


我将在一个更大的文件中执行以下操作。现在,我有一个具有以下值的示例输入文件。

1000,SMITH,JERRY
1001,JOHN,TIA
1002,TWAIN,MARK
1003,HARDY,DENNIS
1004,CHILD,JACK
1005,CHILD,NORTON
1006,DAVIS,JENNY
1007,DAVIS,KAREN
1008,MIKE,JOHN
1009,DENNIS,SHERIN

现在我正在做的是运行一个mapduce作业来加密每条记录的姓氏并写回输出。我使用mapper分区号作为键,修改后的文本作为值。

所以mapper的输出将是,

0   1000,Mj4oJyk=,,JERRY
0   1001,KzwpPQ,TIA
0   1002,NSQgOi8,MARK
0   1003,KTIzNzg,DENNIS
0   1004,IjsoPyU,JACK
0   1005,IjsoPyU,NORTON
0   1006,JTI3OjI,JENNY
0   1007,JTI3OjI,KAREN
0   1008,LDoqNg,JOHN
0   1009,JTYvPSgg,SHERIN

我不想做任何排序。我也使用减速机,因为,在一个更大的文件的情况下,将有多个映射器,如果没有减速机,将写入多个输出文件。所以我使用单个减少来合并来自所有映射器的值并写入单个文件。现在输入到减速机的值以相反的顺序和映射器的顺序出现。它像下面这样,

1009,JTYvPSgg,SHERIN
1008,LDoqNg==,JOHN
1007,JTI3OjI=,KAREN
1006,JTI3OjI=,JENNY
1005,IjsoPyU=,NORTON
1004,IjsoPyU=,JACK
1003,KTIzNzg=,DENNIS
1002,NSQgOi8=,MARK
1001,KzwpPQ==,TIA
1000,Mj4oJyk=,JERRY

为什么它会颠倒顺序?我如何从mapper保持相同的顺序?任何建议都会有帮助

编辑1:

司机代码是,

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
    job.setJobName("encrypt");
    job.setJarByClass(TestDriver.class);
    job.setMapperClass(TestMap.class);
    job.setNumReduceTasks(1);
    job.setReducerClass(TestReduce.class);
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(Text.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(hdfsInputPath));
    FileOutputFormat.setOutputPath(job, new Path(hdfsOutputPath));
System.exit(job.waitForCompletion(true) ? 0 : 1);

映射器代码是,

        inputValues = value.toString().split(",");
        stringBuilder = new StringBuilder();
        TaskID taskId = context.getTaskAttemptID().getTaskID();
        int partition = taskId.getId();

 // the mask(inputvalue) method is called to encrypt input values and write to stringbuilder in appropriate format
        mask(inputvalues);
        context.write(new IntWritable(partition), new Text(stringBuilder.toString()));

减速机代码是,

       for(Text value : values) {
        context.write(new Text(value), null);
       }

共2个答案

匿名用户

MapReduce的基本思想是事情完成的顺序无关紧要。所以你不能(也不需要)控制

  • 输入记录通过映射器。
  • 键和相关值通过还原器。

您唯一可以控制的是值在还原器中可用的迭代器中放置的顺序。

为此,您可以使用Object键来维护值的顺序。LongWritable部分(或键)是文件中该行的位置(不是行号,而是从文件开始的位置)。您可以使用该部分来跟踪哪一行是第一行。

然后您的映射器代码将更改为

protected void map(Object key, Text value, Mapper<Object, Text, LongWritable, Text>.Context context)
        throws IOException, InterruptedException {
    inputValues = value.toString().split(",");
    stringBuilder = new StringBuilder();
    mask(inputValues);
    // the mask(inputvalue) method is called to encrypt input values and write to stringbuilder in appropriate format
    context.write(new LongWritable(((LongWritable) key).get()), value);

}

注意:您可以将代码中的所有IntWritable更改为LongWritable,但要小心。

匿名用户

    inputValues = value.toString().split(",");
    stringBuilder = new StringBuilder();
    TaskID taskId = context.getTaskAttemptID().getTaskID();
    //preserve the number value for sorting
    IntWritable idNumber = new IntWritable(Integer.parseInt(inputValue[0])

    // the mask(inputvalue) method is called to encrypt input values and write to stringbuilder in appropriate format
    mask(inputvalues);
    context.write(idNumber, new Text(stringBuilder.toString()));

我做了一些假设,因为你没有映射器的完整代码。由于toString()输出,我假设inputValue是一个字符串数组。数组的第一个值应该是您输入的数字值,但是它现在是一个字符串。您必须将数字转换回IntWritable以匹配您的映射器发出的IntWritable, Text。hadoop框架将按键排序,并且键的类型为IntWritable,它将按升序排序。您提供的代码是使用任务ID和阅读APIhttps://hadoop.apache.org/docs/r2.4.1/api/org/apache/hadoop/mapred/TaskAttemptID.html#getTaskID()不清楚这是否会根据您的需要为您的值提供顺序。为了控制输出顺序,我建议使用字符串数组的第一个值并转换为IntWritable。我不知道这是否违反了您屏蔽inputValue的意图。

编辑

跟进您的评论。您可以简单地将分区乘以-1这将导致hadoop框架颠倒顺序。

int partition = -1*taskId.getId();