Java源码示例:org.apache.crunch.impl.mr.MRPipeline

示例1
@Test
public void testGeneric() throws IOException {
  String datasetName = tableName + ".TestGenericEntity";

  DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
      .schemaLiteral(testGenericEntity)
      .build();

  Dataset<GenericRecord> inputDataset = repo.create("default", "in", descriptor);
  Dataset<GenericRecord> outputDataset = repo.create("default", datasetName, descriptor);

  writeRecords(inputDataset, 10);

  Pipeline pipeline = new MRPipeline(TestCrunchDatasetsHBase.class, HBaseTestUtils.getConf());
  PCollection<GenericRecord> data = pipeline.read(
      CrunchDatasets.asSource(inputDataset));
  pipeline.write(data, CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND);
  pipeline.run();

  checkRecords(outputDataset, 10, 0);
}
 
示例2
@Test
public void testSourceView() throws IOException {
  String datasetName = tableName + ".TestGenericEntity";

  DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
      .schemaLiteral(testGenericEntity)
      .build();

  Dataset<GenericRecord> inputDataset = repo.create("default", "in", descriptor);
  Dataset<GenericRecord> outputDataset = repo.create("default", datasetName, descriptor);

  writeRecords(inputDataset, 10);

  View<GenericRecord> inputView = inputDataset
      .from("part1", new Utf8("part1_2")).to("part1", new Utf8("part1_7"))
      .from("part2", new Utf8("part2_2")).to("part2", new Utf8("part2_7"));
  Assert.assertEquals(6, datasetSize(inputView));

  Pipeline pipeline = new MRPipeline(TestCrunchDatasetsHBase.class, HBaseTestUtils.getConf());
  PCollection<GenericRecord> data = pipeline.read(
      CrunchDatasets.asSource(inputView));
  pipeline.write(data, CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND);
  pipeline.run();

  checkRecords(outputDataset, 6, 2);
}
 
示例3
@Test
public void testGeneric() throws IOException {
  Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).build());
  Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).build());

  // write two files, each of 5 records
  writeTestUsers(inputDataset, 5, 0);
  writeTestUsers(inputDataset, 5, 5);

  Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
  PCollection<GenericData.Record> data = pipeline.read(
      CrunchDatasets.asSource(inputDataset));
  pipeline.write(data, CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND);
  pipeline.run();

  checkTestUsers(outputDataset, 10);
}
 
示例4
@Test
public void testGenericParquet() throws IOException {
  Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).format(Formats.PARQUET).build());
  Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).format(Formats.PARQUET).build());

  // write two files, each of 5 records
  writeTestUsers(inputDataset, 5, 0);
  writeTestUsers(inputDataset, 5, 5);

  Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
  PCollection<GenericData.Record> data = pipeline.read(
      CrunchDatasets.asSource(inputDataset));
  pipeline.write(data, CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND);
  pipeline.run();

  checkTestUsers(outputDataset, 10);
}
 
示例5
@Test
public void testPartitionedSource() throws IOException {
  PartitionStrategy partitionStrategy = new PartitionStrategy.Builder().hash(
      "username", 2).build();

  Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
  Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).format(Formats.PARQUET).build());

  writeTestUsers(inputDataset, 10);

  PartitionKey key = new PartitionKey(0);
  Dataset<Record> inputPart0 =
      ((PartitionedDataset<Record>) inputDataset).getPartition(key, false);

  Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
  PCollection<GenericData.Record> data = pipeline.read(
      CrunchDatasets.asSource(inputPart0));
  pipeline.write(data, CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND);
  pipeline.run();

  Assert.assertEquals(5, datasetSize(outputDataset));
}
 
示例6
@Test
public void testPartitionedSourceAndTarget() throws IOException {
  PartitionStrategy partitionStrategy = new PartitionStrategy.Builder().hash(
      "username", 2).build();

  Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
  Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());

  writeTestUsers(inputDataset, 10);

  PartitionKey key = new PartitionKey(0);
  Dataset<Record> inputPart0 =
      ((PartitionedDataset<Record>) inputDataset).getPartition(key, false);
  Dataset<Record> outputPart0 =
      ((PartitionedDataset<Record>) outputDataset).getPartition(key, true);

  Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
  PCollection<GenericData.Record> data = pipeline.read(
      CrunchDatasets.asSource(inputPart0));
  pipeline.write(data, CrunchDatasets.asTarget(outputPart0), Target.WriteMode.APPEND);
  pipeline.run();

  Assert.assertEquals(5, datasetSize(outputPart0));
}
 
示例7
@Test
public void testSourceView() throws IOException {
  PartitionStrategy partitionStrategy = new PartitionStrategy.Builder().hash(
      "username", 2).build();

  Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
  Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).format(Formats.PARQUET).build());

  writeTestUsers(inputDataset, 10);

  View<Record> inputView = inputDataset.with("username", "test-0");
  Assert.assertEquals(1, datasetSize(inputView));

  Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
  PCollection<GenericData.Record> data = pipeline.read(
      CrunchDatasets.asSource(inputView));
  pipeline.write(data, CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND);
  pipeline.run();

  Assert.assertEquals(1, datasetSize(outputDataset));
}
 
示例8
@Test
public void testTargetView() throws IOException {
  PartitionStrategy partitionStrategy = new PartitionStrategy.Builder().hash(
      "username", 2).build();

  Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
  Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());

  writeTestUsers(inputDataset, 10);

  View<Record> inputView = inputDataset.with("username", "test-0");
  Assert.assertEquals(1, datasetSize(inputView));
  View<Record> outputView = outputDataset.with("username", "test-0");

  Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
  PCollection<GenericData.Record> data = pipeline.read(
      CrunchDatasets.asSource(inputView));
  pipeline.write(data, CrunchDatasets.asTarget(outputView), Target.WriteMode.APPEND);
  pipeline.run();

  Assert.assertEquals(1, datasetSize(outputDataset));
}
 
示例9
@Test
public void testTargetViewProvidedPartition() throws IOException {
    PartitionStrategy partitionStrategy = new PartitionStrategy.Builder().provided("version").build();

    Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
            .schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
    Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
            .schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());

    View<Record> inputView = inputDataset.with("version", "test-version-0");

    writeTestUsers(inputView, 1);

    Assert.assertEquals(1, datasetSize(inputView));
    View<Record> outputView = outputDataset.with("version", "test-version-0");

    Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
    PCollection<GenericData.Record> data = pipeline.read(
            CrunchDatasets.asSource(inputView));
    pipeline.write(data, CrunchDatasets.asTarget(outputView), Target.WriteMode.APPEND);
    pipeline.run();

    Assert.assertEquals(1, datasetSize(outputDataset));
}
 
示例10
@Test
public void testDatasetUris() throws IOException {
  PartitionStrategy partitionStrategy = new PartitionStrategy.Builder().hash(
      "username", 2).build();

  Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
  Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());

  writeTestUsers(inputDataset, 10);

  Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
  PCollection<GenericData.Record> data = pipeline.read(
      CrunchDatasets.asSource(new URIBuilder(repo.getUri(), "ns", "in").build(),
          GenericData.Record.class));
  pipeline.write(data, CrunchDatasets.asTarget(
      new URIBuilder(repo.getUri(), "ns", "out").build()), Target.WriteMode.APPEND);
  pipeline.run();

  Assert.assertEquals(10, datasetSize(outputDataset));
}
 
示例11
@Test
public void testWriteModeOverwrite() throws IOException {
  Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).build());
  Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).build());

  writeTestUsers(inputDataset, 1, 0);
  writeTestUsers(outputDataset, 1, 1);

  Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
  PCollection<GenericData.Record> data = pipeline.read(
      CrunchDatasets.asSource(inputDataset));
  pipeline.write(data, CrunchDatasets.asTarget((View<Record>) outputDataset),
      Target.WriteMode.OVERWRITE);

  pipeline.run();

  checkTestUsers(outputDataset, 1);
}
 
示例12
@Test
public void testMultipleFileReadingFromCrunch() throws IOException {
  Dataset<Record> inputDatasetA = repo.create("ns", "inA", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).build());
  Dataset<Record> inputDatasetB = repo.create("ns", "inB", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).build());
  Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).build());

  // write two files, each of 5 records
  writeTestUsers(inputDatasetA, 5, 0);
  writeTestUsers(inputDatasetB, 5, 5);

  Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
  PCollection<GenericData.Record> dataA = pipeline.read(
      CrunchDatasets.asSource(inputDatasetA));
  PCollection<GenericData.Record> dataB = pipeline.read(
      CrunchDatasets.asSource(inputDatasetB));
  pipeline.write(dataA.union(dataB), CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND);
  pipeline.run();

  checkTestUsers(outputDataset, 10);
}
 
示例13
public int run(String[] args) throws Exception {

    String fooInputPath = args[0];
    String barInputPath = args[1];
    String outputPath = args[2];
    int fooValMax = Integer.parseInt(args[3]);
    int joinValMax = Integer.parseInt(args[4]);
    int numberOfReducers = Integer.parseInt(args[5]);

    Pipeline pipeline = new MRPipeline(JoinFilterExampleCrunch.class, getConf()); //<1>
    
    PCollection<String> fooLines = pipeline.readTextFile(fooInputPath);  //<2>
    PCollection<String> barLines = pipeline.readTextFile(barInputPath);

    PTable<Long, Pair<Long, Integer>> fooTable = fooLines.parallelDo(  //<3>
        new FooIndicatorFn(),
        Avros.tableOf(Avros.longs(),
        Avros.pairs(Avros.longs(), Avros.ints())));

    fooTable = fooTable.filter(new FooFilter(fooValMax));  //<4>

    PTable<Long, Integer> barTable = barLines.parallelDo(new BarIndicatorFn(),
        Avros.tableOf(Avros.longs(), Avros.ints()));

    DefaultJoinStrategy<Long, Pair<Long, Integer>, Integer> joinStrategy =   //<5>
        new DefaultJoinStrategy
          <Long, Pair<Long, Integer>, Integer>
          (numberOfReducers);

    PTable<Long, Pair<Pair<Long, Integer>, Integer>> joinedTable = joinStrategy //<6>
        .join(fooTable, barTable, JoinType.INNER_JOIN);

    PTable<Long, Pair<Pair<Long, Integer>, Integer>> filteredTable = joinedTable.filter(new JoinFilter(joinValMax));

    filteredTable.write(At.textFile(outputPath), WriteMode.OVERWRITE); //<7>

    PipelineResult result = pipeline.done();

    return result.succeeded() ? 0 : 1;
  }
 
示例14
public int run(String[] args) throws Exception {

        if (args.length != 2) {
            System.err.println("Usage: hadoop jar crunch-1.0.0-SNAPSHOT-job.jar" + " [generic options] input output");
            System.err.println();
            GenericOptionsParser.printGenericCommandUsage(System.err);
            return 1;
        }

        String inputPath = args[0];
        String outputPath = args[1];

        // Create an object to coordinate pipeline creation and execution.
        Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

        // Reference a given text file as a collection of Strings.
        PCollection<String> lines = pipeline.readTextFile(inputPath);

        // Define a function that splits each line in a PCollection of Strings into
        // a PCollection made up of the individual words in the file.
        // The second argument sets the serialization format.
        PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());

        // Take the collection of words and remove known stop words.
        PCollection<String> noStopWords = words.filter(new StopWordFilter());

        // The count method applies a series of Crunch primitives and returns
        // a map of the unique words in the input PCollection to their counts.
        PTable<String, Long> counts = noStopWords.count();

        // Instruct the pipeline to write the resulting counts to a text file.
        pipeline.writeTextFile(counts, outputPath);

        // Execute the pipeline as a MapReduce.
        PipelineResult result = pipeline.done();

        return result.succeeded() ? 0 : 1;
    }
 
示例15
@Test
public void testPartitionedSourceAndTargetWritingToTopLevel() throws IOException {
  PartitionStrategy partitionStrategy = new PartitionStrategy.Builder().hash(
      "username", 2).build();

  Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
  Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());

  writeTestUsers(inputDataset, 10);

  PartitionKey key = new PartitionKey(0);
  Dataset<Record> inputPart0 =
      ((PartitionedDataset<Record>) inputDataset).getPartition(key, false);

  Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
  PCollection<GenericData.Record> data = pipeline.read(
      CrunchDatasets.asSource(inputPart0));
  pipeline.write(data, CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND);
  pipeline.run();

  Assert.assertEquals(5, datasetSize(outputDataset));

  // check all records are in the correct partition
  Dataset<Record> outputPart0 =
      ((PartitionedDataset<Record>) outputDataset).getPartition(key, false);
  Assert.assertNotNull(outputPart0);
  Assert.assertEquals(5, datasetSize(outputPart0));
}
 
示例16
@Test
public void testViewUris() throws IOException {
  PartitionStrategy partitionStrategy = new PartitionStrategy.Builder().hash(
      "username", 2).build();

  Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
  Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());

  writeTestUsers(inputDataset, 10);

  URI sourceViewUri = new URIBuilder(repo.getUri(), "ns", "in").with("username",
      "test-0").build();
  View<Record> inputView = Datasets.<Record, Dataset<Record>> load(sourceViewUri,
      Record.class);
  Assert.assertEquals(1, datasetSize(inputView));

  Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
  PCollection<GenericData.Record> data = pipeline.read(CrunchDatasets
      .asSource(sourceViewUri, GenericData.Record.class));
  URI targetViewUri = new URIBuilder(repo.getUri(), "ns", "out").with(
      "email", "email-0").build();
  pipeline.write(data, CrunchDatasets.asTarget(targetViewUri),
      Target.WriteMode.APPEND);
  pipeline.run();

  Assert.assertEquals(1, datasetSize(outputDataset));
}
 
示例17
@Test(expected = CrunchRuntimeException.class)
public void testWriteModeDefaultFailsWithExisting() throws IOException {
  Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).build());
  Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).build());

  writeTestUsers(inputDataset, 1, 0);
  writeTestUsers(outputDataset, 1, 0);

  Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
  PCollection<GenericData.Record> data = pipeline.read(
      CrunchDatasets.asSource(inputDataset));
  pipeline.write(data, CrunchDatasets.asTarget((View<Record>) outputDataset));
}
 
示例18
@Test
public void testSignalReadyOutputView() {
  Assume.assumeTrue(!Hadoop.isHadoop1());
  Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).build());

  Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
      .schema(USER_SCHEMA).build());

  writeTestUsers(inputDataset, 10);

  View<Record> inputView = inputDataset.with("username", "test-8", "test-9");
  View<Record> outputView = outputDataset.with("username", "test-8", "test-9");
  Assert.assertEquals(2, datasetSize(inputView));

  Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
  PCollection<GenericData.Record> data = pipeline.read(
      CrunchDatasets.asSource(inputView));
  pipeline.write(data, CrunchDatasets.asTarget(outputView), Target.WriteMode.APPEND);
  pipeline.run();

  Assert.assertEquals(2, datasetSize(outputView));

  Assert.assertFalse("Output dataset should not be signaled ready",
      ((Signalable)outputDataset).isReady());
  Assert.assertTrue("Output view should be signaled ready",
      ((Signalable)outputView).isReady());
}
 
示例19
private void runCheckpointPipeline(View<Record> inputView,
    View<Record> outputView) {
  Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
  PCollection<GenericData.Record> data = pipeline.read(
      CrunchDatasets.asSource(inputView));
  pipeline.write(data, CrunchDatasets.asTarget(outputView),
      Target.WriteMode.CHECKPOINT);
  pipeline.done();
}
 
示例20
@Override
public int run(String[] args) throws Exception {

  new JCommander(this, args);

  URI outputUri = URI.create(output);

  // Our crunch job is a MapReduce job
  Pipeline pipeline = new MRPipeline(LegacyHdfs2Cass.class, getConf());

  // Parse & fetch info about target Cassandra cluster
  CassandraParams params = CassandraParams.parse(outputUri);

  // Read records from Avro files in inputFolder
  PCollection<ByteBuffer> records =
      pipeline.read(From.avroFile(inputList(input), Avros.records(ByteBuffer.class)));

  // Transform the input
  String protocol = outputUri.getScheme();
  if (protocol.equalsIgnoreCase("thrift")) {
    records
        // First convert ByteBuffers to ThriftRecords
        .parallelDo(new LegacyHdfsToThrift(), ThriftRecord.PTYPE)
        // Then group the ThriftRecords in preparation for writing them
        .parallelDo(new ThriftRecord.AsPair(), ThriftRecord.AsPair.PTYPE)
        .groupByKey(params.createGroupingOptions())
        // Finally write the ThriftRecords to Cassandra
        .write(new ThriftTarget(outputUri, params));
  }
  else if (protocol.equalsIgnoreCase("cql")) {
    records
        // In case of CQL, convert ByteBuffers to CQLRecords
        .parallelDo(new LegacyHdfsToCQL(), CQLRecord.PTYPE)
        .by(params.getKeyFn(), Avros.bytes())
        .groupByKey(params.createGroupingOptions())
        .write(new CQLTarget(outputUri, params));
  }

  // Execute the pipeline
  PipelineResult result = pipeline.done();
  return result.succeeded() ? 0 : 1;
}
 
示例21
@Override
public int run(String[] args) throws Exception {

  new JCommander(this, args);

  URI outputUri = URI.create(output);

  // Our crunch job is a MapReduce job
  Configuration conf = getConf();
  conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, Boolean.FALSE);
  conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, Boolean.FALSE);
  Pipeline pipeline = new MRPipeline(Hdfs2Cass.class, conf);

  // Parse & fetch info about target Cassandra cluster
  CassandraParams params = CassandraParams.parse(outputUri);

  PCollection<GenericRecord> records =
      ((PCollection<GenericRecord>)(PCollection) pipeline.read(From.avroFile(inputList(input))));

  String protocol = outputUri.getScheme();
  if (protocol.equalsIgnoreCase("thrift")) {
    records
        // First convert ByteBuffers to ThriftRecords
        .parallelDo(new AvroToThrift(rowkey, timestamp, ttl, ignore), ThriftRecord.PTYPE)
        // Then group the ThriftRecords in preparation for writing them
        .parallelDo(new ThriftRecord.AsPair(), ThriftRecord.AsPair.PTYPE)
        .groupByKey(params.createGroupingOptions())
         // Finally write the ThriftRecords to Cassandra
        .write(new ThriftTarget(outputUri, params));
  }
  else if (protocol.equalsIgnoreCase("cql")) {
    records
        // In case of CQL, convert ByteBuffers to CQLRecords
        .parallelDo(new AvroToCQL(rowkey, timestamp, ttl, ignore), CQLRecord.PTYPE)
        .by(params.getKeyFn(), Avros.bytes())
        .groupByKey(params.createGroupingOptions())
        .write(new CQLTarget(outputUri, params));
  }

  // Execute the pipeline
  PipelineResult result = pipeline.done();
  return result.succeeded() ? 0 : 1;
}
 
示例22
public PipelineResult run() throws IOException {
  boolean isLocal = (isLocal(from.getDataset()) || isLocal(to.getDataset()));
  if (isLocal) {
    // copy to avoid making changes to the caller's configuration
    Configuration conf = new Configuration(getConf());
    conf.set("mapreduce.framework.name", "local");
    setConf(conf);
  }

  if (isHive(from) || isHive(to)) {
    setConf(addHiveDelegationToken(getConf()));

    // add jars needed for metastore interaction to the classpath
    if (!isLocal) {
      Class<?> fb303Class, thriftClass;
      try {
        // attempt to use libfb303 and libthrift 0.9.2 when async was added
        fb303Class = Class.forName(
            "com.facebook.fb303.FacebookService.AsyncProcessor");
        thriftClass = Class.forName(
            "org.apache.thrift.TBaseAsyncProcessor");
      } catch (ClassNotFoundException e) {
        try {
          // fallback to 0.9.0 or earlier
          fb303Class = Class.forName(
              "com.facebook.fb303.FacebookBase");
          thriftClass = Class.forName(
              "org.apache.thrift.TBase");
        } catch (ClassNotFoundException real) {
          throw new DatasetOperationException(
              "Cannot find thrift dependencies", real);
        }
      }

      TaskUtil.configure(getConf())
          .addJarForClass(Encoder.class) // commons-codec
          .addJarForClass(Log.class) // commons-logging
          .addJarForClass(CompressorInputStream.class) // commons-compress
          .addJarForClass(ApiAdapter.class) // datanucleus-core
          .addJarForClass(JDOAdapter.class) // datanucleus-api-jdo
          .addJarForClass(SQLQuery.class) // datanucleus-rdbms
          .addJarForClass(JDOHelper.class) // jdo-api
          .addJarForClass(Transaction.class) // jta
          .addJarForClass(fb303Class) // libfb303
          .addJarForClass(thriftClass) // libthrift
          .addJarForClass(HiveMetaStore.class) // hive-metastore
          .addJarForClass(HiveConf.class); // hive-exec
    }
  }

  PType<T> toPType = ptype(to);
  MapFn<T, T> validate = new CheckEntityClass<T>(to.getType());

  Pipeline pipeline = new MRPipeline(getClass(), getConf());

  PCollection<T> collection = pipeline.read(CrunchDatasets.asSource(from))
      .parallelDo(transform, toPType).parallelDo(validate, toPType);

  if (compact) {
    // the transform must be run before partitioning
    collection = CrunchDatasets.partition(collection, to, numWriters, numPartitionWriters);
  }

  pipeline.write(collection, CrunchDatasets.asTarget(to), mode);

  PipelineResult result = pipeline.done();

  StageResult sr = Iterables.getFirst(result.getStageResults(), null);
  if (sr != null && MAP_INPUT_RECORDS != null) {
    this.count = sr.getCounterValue(MAP_INPUT_RECORDS);
  }

  return result;
}
 
示例23
@Test
public void testUseReaderSchema() throws IOException {

  // Create a schema with only a username, so we can test reading it
  // with an enhanced record structure.
  Schema oldRecordSchema = SchemaBuilder.record("org.kitesdk.data.user.OldUserRecord")
      .fields()
      .requiredString("username")
      .endRecord();

  // create the dataset
  Dataset<Record> in = repo.create("ns", "in", new DatasetDescriptor.Builder()
      .schema(oldRecordSchema).build());
  Dataset<Record> out = repo.create("ns", "out", new DatasetDescriptor.Builder()
      .schema(oldRecordSchema).build());
  Record oldUser = new Record(oldRecordSchema);
  oldUser.put("username", "user");

  DatasetWriter<Record> writer = in.newWriter();

  try {

    writer.write(oldUser);

  } finally {
    writer.close();
  }

  Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);

  // read data from updated dataset that has the new schema.
  // At this point, User class has the old schema
  PCollection<NewUserRecord> data = pipeline.read(CrunchDatasets.asSource(in.getUri(),
      NewUserRecord.class));

  PCollection<NewUserRecord> processed = data.parallelDo(new UserRecordIdentityFn(),
      Avros.records(NewUserRecord.class));

  pipeline.write(processed, CrunchDatasets.asTarget(out));

  DatasetReader reader = out.newReader();

  Assert.assertTrue("Pipeline failed.", pipeline.run().succeeded());

  try {

    // there should be one record that is equal to our old user generic record.
    Assert.assertEquals(oldUser, reader.next());
    Assert.assertFalse(reader.hasNext());

  } finally {
    reader.close();
  }
}
 
示例24
@Test
public void testUseReaderSchemaParquet() throws IOException {

  // Create a schema with only a username, so we can test reading it
  // with an enhanced record structure.
  Schema oldRecordSchema = SchemaBuilder.record("org.kitesdk.data.user.OldUserRecord")
      .fields()
      .requiredString("username")
      .endRecord();

  // create the dataset
  Dataset<Record> in = repo.create("ns", "in", new DatasetDescriptor.Builder()
      .format(Formats.PARQUET).schema(oldRecordSchema).build());

  Dataset<Record> out = repo.create("ns", "out", new DatasetDescriptor.Builder()
      .format(Formats.PARQUET).schema(oldRecordSchema).build());
  Record oldUser = new Record(oldRecordSchema);
  oldUser.put("username", "user");

  DatasetWriter<Record> writer = in.newWriter();

  try {

    writer.write(oldUser);

  } finally {
    writer.close();
  }

  Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);

  // read data from updated dataset that has the new schema.
  // At this point, User class has the old schema
  PCollection<NewUserRecord> data = pipeline.read(CrunchDatasets.asSource(in.getUri(),
      NewUserRecord.class));

  PCollection<NewUserRecord> processed = data.parallelDo(new UserRecordIdentityFn(),
      Avros.records(NewUserRecord.class));

  pipeline.write(processed, CrunchDatasets.asTarget(out));

  DatasetReader reader = out.newReader();

  Assert.assertTrue("Pipeline failed.", pipeline.run().succeeded());

  try {

    // there should be one record that is equal to our old user generic record.
    Assert.assertEquals(oldUser, reader.next());
    Assert.assertFalse(reader.hasNext());

  } finally {
    reader.close();
  }
}