Java源码示例:org.apache.crunch.impl.mr.MRPipeline
示例1
@Test
public void testGeneric() throws IOException {
String datasetName = tableName + ".TestGenericEntity";
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schemaLiteral(testGenericEntity)
.build();
Dataset<GenericRecord> inputDataset = repo.create("default", "in", descriptor);
Dataset<GenericRecord> outputDataset = repo.create("default", datasetName, descriptor);
writeRecords(inputDataset, 10);
Pipeline pipeline = new MRPipeline(TestCrunchDatasetsHBase.class, HBaseTestUtils.getConf());
PCollection<GenericRecord> data = pipeline.read(
CrunchDatasets.asSource(inputDataset));
pipeline.write(data, CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND);
pipeline.run();
checkRecords(outputDataset, 10, 0);
}
示例2
@Test
public void testSourceView() throws IOException {
String datasetName = tableName + ".TestGenericEntity";
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schemaLiteral(testGenericEntity)
.build();
Dataset<GenericRecord> inputDataset = repo.create("default", "in", descriptor);
Dataset<GenericRecord> outputDataset = repo.create("default", datasetName, descriptor);
writeRecords(inputDataset, 10);
View<GenericRecord> inputView = inputDataset
.from("part1", new Utf8("part1_2")).to("part1", new Utf8("part1_7"))
.from("part2", new Utf8("part2_2")).to("part2", new Utf8("part2_7"));
Assert.assertEquals(6, datasetSize(inputView));
Pipeline pipeline = new MRPipeline(TestCrunchDatasetsHBase.class, HBaseTestUtils.getConf());
PCollection<GenericRecord> data = pipeline.read(
CrunchDatasets.asSource(inputView));
pipeline.write(data, CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND);
pipeline.run();
checkRecords(outputDataset, 6, 2);
}
示例3
@Test
public void testGeneric() throws IOException {
Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).build());
Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).build());
// write two files, each of 5 records
writeTestUsers(inputDataset, 5, 0);
writeTestUsers(inputDataset, 5, 5);
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
PCollection<GenericData.Record> data = pipeline.read(
CrunchDatasets.asSource(inputDataset));
pipeline.write(data, CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND);
pipeline.run();
checkTestUsers(outputDataset, 10);
}
示例4
@Test
public void testGenericParquet() throws IOException {
Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).format(Formats.PARQUET).build());
Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).format(Formats.PARQUET).build());
// write two files, each of 5 records
writeTestUsers(inputDataset, 5, 0);
writeTestUsers(inputDataset, 5, 5);
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
PCollection<GenericData.Record> data = pipeline.read(
CrunchDatasets.asSource(inputDataset));
pipeline.write(data, CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND);
pipeline.run();
checkTestUsers(outputDataset, 10);
}
示例5
@Test
public void testPartitionedSource() throws IOException {
PartitionStrategy partitionStrategy = new PartitionStrategy.Builder().hash(
"username", 2).build();
Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).format(Formats.PARQUET).build());
writeTestUsers(inputDataset, 10);
PartitionKey key = new PartitionKey(0);
Dataset<Record> inputPart0 =
((PartitionedDataset<Record>) inputDataset).getPartition(key, false);
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
PCollection<GenericData.Record> data = pipeline.read(
CrunchDatasets.asSource(inputPart0));
pipeline.write(data, CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND);
pipeline.run();
Assert.assertEquals(5, datasetSize(outputDataset));
}
示例6
@Test
public void testPartitionedSourceAndTarget() throws IOException {
PartitionStrategy partitionStrategy = new PartitionStrategy.Builder().hash(
"username", 2).build();
Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
writeTestUsers(inputDataset, 10);
PartitionKey key = new PartitionKey(0);
Dataset<Record> inputPart0 =
((PartitionedDataset<Record>) inputDataset).getPartition(key, false);
Dataset<Record> outputPart0 =
((PartitionedDataset<Record>) outputDataset).getPartition(key, true);
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
PCollection<GenericData.Record> data = pipeline.read(
CrunchDatasets.asSource(inputPart0));
pipeline.write(data, CrunchDatasets.asTarget(outputPart0), Target.WriteMode.APPEND);
pipeline.run();
Assert.assertEquals(5, datasetSize(outputPart0));
}
示例7
@Test
public void testSourceView() throws IOException {
PartitionStrategy partitionStrategy = new PartitionStrategy.Builder().hash(
"username", 2).build();
Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).format(Formats.PARQUET).build());
writeTestUsers(inputDataset, 10);
View<Record> inputView = inputDataset.with("username", "test-0");
Assert.assertEquals(1, datasetSize(inputView));
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
PCollection<GenericData.Record> data = pipeline.read(
CrunchDatasets.asSource(inputView));
pipeline.write(data, CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND);
pipeline.run();
Assert.assertEquals(1, datasetSize(outputDataset));
}
示例8
@Test
public void testTargetView() throws IOException {
PartitionStrategy partitionStrategy = new PartitionStrategy.Builder().hash(
"username", 2).build();
Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
writeTestUsers(inputDataset, 10);
View<Record> inputView = inputDataset.with("username", "test-0");
Assert.assertEquals(1, datasetSize(inputView));
View<Record> outputView = outputDataset.with("username", "test-0");
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
PCollection<GenericData.Record> data = pipeline.read(
CrunchDatasets.asSource(inputView));
pipeline.write(data, CrunchDatasets.asTarget(outputView), Target.WriteMode.APPEND);
pipeline.run();
Assert.assertEquals(1, datasetSize(outputDataset));
}
示例9
@Test
public void testTargetViewProvidedPartition() throws IOException {
PartitionStrategy partitionStrategy = new PartitionStrategy.Builder().provided("version").build();
Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
View<Record> inputView = inputDataset.with("version", "test-version-0");
writeTestUsers(inputView, 1);
Assert.assertEquals(1, datasetSize(inputView));
View<Record> outputView = outputDataset.with("version", "test-version-0");
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
PCollection<GenericData.Record> data = pipeline.read(
CrunchDatasets.asSource(inputView));
pipeline.write(data, CrunchDatasets.asTarget(outputView), Target.WriteMode.APPEND);
pipeline.run();
Assert.assertEquals(1, datasetSize(outputDataset));
}
示例10
@Test
public void testDatasetUris() throws IOException {
PartitionStrategy partitionStrategy = new PartitionStrategy.Builder().hash(
"username", 2).build();
Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
writeTestUsers(inputDataset, 10);
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
PCollection<GenericData.Record> data = pipeline.read(
CrunchDatasets.asSource(new URIBuilder(repo.getUri(), "ns", "in").build(),
GenericData.Record.class));
pipeline.write(data, CrunchDatasets.asTarget(
new URIBuilder(repo.getUri(), "ns", "out").build()), Target.WriteMode.APPEND);
pipeline.run();
Assert.assertEquals(10, datasetSize(outputDataset));
}
示例11
@Test
public void testWriteModeOverwrite() throws IOException {
Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).build());
Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).build());
writeTestUsers(inputDataset, 1, 0);
writeTestUsers(outputDataset, 1, 1);
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
PCollection<GenericData.Record> data = pipeline.read(
CrunchDatasets.asSource(inputDataset));
pipeline.write(data, CrunchDatasets.asTarget((View<Record>) outputDataset),
Target.WriteMode.OVERWRITE);
pipeline.run();
checkTestUsers(outputDataset, 1);
}
示例12
@Test
public void testMultipleFileReadingFromCrunch() throws IOException {
Dataset<Record> inputDatasetA = repo.create("ns", "inA", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).build());
Dataset<Record> inputDatasetB = repo.create("ns", "inB", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).build());
Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).build());
// write two files, each of 5 records
writeTestUsers(inputDatasetA, 5, 0);
writeTestUsers(inputDatasetB, 5, 5);
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
PCollection<GenericData.Record> dataA = pipeline.read(
CrunchDatasets.asSource(inputDatasetA));
PCollection<GenericData.Record> dataB = pipeline.read(
CrunchDatasets.asSource(inputDatasetB));
pipeline.write(dataA.union(dataB), CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND);
pipeline.run();
checkTestUsers(outputDataset, 10);
}
示例13
public int run(String[] args) throws Exception {
String fooInputPath = args[0];
String barInputPath = args[1];
String outputPath = args[2];
int fooValMax = Integer.parseInt(args[3]);
int joinValMax = Integer.parseInt(args[4]);
int numberOfReducers = Integer.parseInt(args[5]);
Pipeline pipeline = new MRPipeline(JoinFilterExampleCrunch.class, getConf()); //<1>
PCollection<String> fooLines = pipeline.readTextFile(fooInputPath); //<2>
PCollection<String> barLines = pipeline.readTextFile(barInputPath);
PTable<Long, Pair<Long, Integer>> fooTable = fooLines.parallelDo( //<3>
new FooIndicatorFn(),
Avros.tableOf(Avros.longs(),
Avros.pairs(Avros.longs(), Avros.ints())));
fooTable = fooTable.filter(new FooFilter(fooValMax)); //<4>
PTable<Long, Integer> barTable = barLines.parallelDo(new BarIndicatorFn(),
Avros.tableOf(Avros.longs(), Avros.ints()));
DefaultJoinStrategy<Long, Pair<Long, Integer>, Integer> joinStrategy = //<5>
new DefaultJoinStrategy
<Long, Pair<Long, Integer>, Integer>
(numberOfReducers);
PTable<Long, Pair<Pair<Long, Integer>, Integer>> joinedTable = joinStrategy //<6>
.join(fooTable, barTable, JoinType.INNER_JOIN);
PTable<Long, Pair<Pair<Long, Integer>, Integer>> filteredTable = joinedTable.filter(new JoinFilter(joinValMax));
filteredTable.write(At.textFile(outputPath), WriteMode.OVERWRITE); //<7>
PipelineResult result = pipeline.done();
return result.succeeded() ? 0 : 1;
}
示例14
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: hadoop jar crunch-1.0.0-SNAPSHOT-job.jar" + " [generic options] input output");
System.err.println();
GenericOptionsParser.printGenericCommandUsage(System.err);
return 1;
}
String inputPath = args[0];
String outputPath = args[1];
// Create an object to coordinate pipeline creation and execution.
Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
// Reference a given text file as a collection of Strings.
PCollection<String> lines = pipeline.readTextFile(inputPath);
// Define a function that splits each line in a PCollection of Strings into
// a PCollection made up of the individual words in the file.
// The second argument sets the serialization format.
PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());
// Take the collection of words and remove known stop words.
PCollection<String> noStopWords = words.filter(new StopWordFilter());
// The count method applies a series of Crunch primitives and returns
// a map of the unique words in the input PCollection to their counts.
PTable<String, Long> counts = noStopWords.count();
// Instruct the pipeline to write the resulting counts to a text file.
pipeline.writeTextFile(counts, outputPath);
// Execute the pipeline as a MapReduce.
PipelineResult result = pipeline.done();
return result.succeeded() ? 0 : 1;
}
示例15
@Test
public void testPartitionedSourceAndTargetWritingToTopLevel() throws IOException {
PartitionStrategy partitionStrategy = new PartitionStrategy.Builder().hash(
"username", 2).build();
Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
writeTestUsers(inputDataset, 10);
PartitionKey key = new PartitionKey(0);
Dataset<Record> inputPart0 =
((PartitionedDataset<Record>) inputDataset).getPartition(key, false);
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
PCollection<GenericData.Record> data = pipeline.read(
CrunchDatasets.asSource(inputPart0));
pipeline.write(data, CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND);
pipeline.run();
Assert.assertEquals(5, datasetSize(outputDataset));
// check all records are in the correct partition
Dataset<Record> outputPart0 =
((PartitionedDataset<Record>) outputDataset).getPartition(key, false);
Assert.assertNotNull(outputPart0);
Assert.assertEquals(5, datasetSize(outputPart0));
}
示例16
@Test
public void testViewUris() throws IOException {
PartitionStrategy partitionStrategy = new PartitionStrategy.Builder().hash(
"username", 2).build();
Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).partitionStrategy(partitionStrategy).build());
writeTestUsers(inputDataset, 10);
URI sourceViewUri = new URIBuilder(repo.getUri(), "ns", "in").with("username",
"test-0").build();
View<Record> inputView = Datasets.<Record, Dataset<Record>> load(sourceViewUri,
Record.class);
Assert.assertEquals(1, datasetSize(inputView));
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
PCollection<GenericData.Record> data = pipeline.read(CrunchDatasets
.asSource(sourceViewUri, GenericData.Record.class));
URI targetViewUri = new URIBuilder(repo.getUri(), "ns", "out").with(
"email", "email-0").build();
pipeline.write(data, CrunchDatasets.asTarget(targetViewUri),
Target.WriteMode.APPEND);
pipeline.run();
Assert.assertEquals(1, datasetSize(outputDataset));
}
示例17
@Test(expected = CrunchRuntimeException.class)
public void testWriteModeDefaultFailsWithExisting() throws IOException {
Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).build());
Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).build());
writeTestUsers(inputDataset, 1, 0);
writeTestUsers(outputDataset, 1, 0);
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
PCollection<GenericData.Record> data = pipeline.read(
CrunchDatasets.asSource(inputDataset));
pipeline.write(data, CrunchDatasets.asTarget((View<Record>) outputDataset));
}
示例18
@Test
public void testSignalReadyOutputView() {
Assume.assumeTrue(!Hadoop.isHadoop1());
Dataset<Record> inputDataset = repo.create("ns", "in", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).build());
Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).build());
writeTestUsers(inputDataset, 10);
View<Record> inputView = inputDataset.with("username", "test-8", "test-9");
View<Record> outputView = outputDataset.with("username", "test-8", "test-9");
Assert.assertEquals(2, datasetSize(inputView));
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
PCollection<GenericData.Record> data = pipeline.read(
CrunchDatasets.asSource(inputView));
pipeline.write(data, CrunchDatasets.asTarget(outputView), Target.WriteMode.APPEND);
pipeline.run();
Assert.assertEquals(2, datasetSize(outputView));
Assert.assertFalse("Output dataset should not be signaled ready",
((Signalable)outputDataset).isReady());
Assert.assertTrue("Output view should be signaled ready",
((Signalable)outputView).isReady());
}
示例19
private void runCheckpointPipeline(View<Record> inputView,
View<Record> outputView) {
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
PCollection<GenericData.Record> data = pipeline.read(
CrunchDatasets.asSource(inputView));
pipeline.write(data, CrunchDatasets.asTarget(outputView),
Target.WriteMode.CHECKPOINT);
pipeline.done();
}
示例20
@Override
public int run(String[] args) throws Exception {
new JCommander(this, args);
URI outputUri = URI.create(output);
// Our crunch job is a MapReduce job
Pipeline pipeline = new MRPipeline(LegacyHdfs2Cass.class, getConf());
// Parse & fetch info about target Cassandra cluster
CassandraParams params = CassandraParams.parse(outputUri);
// Read records from Avro files in inputFolder
PCollection<ByteBuffer> records =
pipeline.read(From.avroFile(inputList(input), Avros.records(ByteBuffer.class)));
// Transform the input
String protocol = outputUri.getScheme();
if (protocol.equalsIgnoreCase("thrift")) {
records
// First convert ByteBuffers to ThriftRecords
.parallelDo(new LegacyHdfsToThrift(), ThriftRecord.PTYPE)
// Then group the ThriftRecords in preparation for writing them
.parallelDo(new ThriftRecord.AsPair(), ThriftRecord.AsPair.PTYPE)
.groupByKey(params.createGroupingOptions())
// Finally write the ThriftRecords to Cassandra
.write(new ThriftTarget(outputUri, params));
}
else if (protocol.equalsIgnoreCase("cql")) {
records
// In case of CQL, convert ByteBuffers to CQLRecords
.parallelDo(new LegacyHdfsToCQL(), CQLRecord.PTYPE)
.by(params.getKeyFn(), Avros.bytes())
.groupByKey(params.createGroupingOptions())
.write(new CQLTarget(outputUri, params));
}
// Execute the pipeline
PipelineResult result = pipeline.done();
return result.succeeded() ? 0 : 1;
}
示例21
@Override
public int run(String[] args) throws Exception {
new JCommander(this, args);
URI outputUri = URI.create(output);
// Our crunch job is a MapReduce job
Configuration conf = getConf();
conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, Boolean.FALSE);
conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, Boolean.FALSE);
Pipeline pipeline = new MRPipeline(Hdfs2Cass.class, conf);
// Parse & fetch info about target Cassandra cluster
CassandraParams params = CassandraParams.parse(outputUri);
PCollection<GenericRecord> records =
((PCollection<GenericRecord>)(PCollection) pipeline.read(From.avroFile(inputList(input))));
String protocol = outputUri.getScheme();
if (protocol.equalsIgnoreCase("thrift")) {
records
// First convert ByteBuffers to ThriftRecords
.parallelDo(new AvroToThrift(rowkey, timestamp, ttl, ignore), ThriftRecord.PTYPE)
// Then group the ThriftRecords in preparation for writing them
.parallelDo(new ThriftRecord.AsPair(), ThriftRecord.AsPair.PTYPE)
.groupByKey(params.createGroupingOptions())
// Finally write the ThriftRecords to Cassandra
.write(new ThriftTarget(outputUri, params));
}
else if (protocol.equalsIgnoreCase("cql")) {
records
// In case of CQL, convert ByteBuffers to CQLRecords
.parallelDo(new AvroToCQL(rowkey, timestamp, ttl, ignore), CQLRecord.PTYPE)
.by(params.getKeyFn(), Avros.bytes())
.groupByKey(params.createGroupingOptions())
.write(new CQLTarget(outputUri, params));
}
// Execute the pipeline
PipelineResult result = pipeline.done();
return result.succeeded() ? 0 : 1;
}
示例22
public PipelineResult run() throws IOException {
boolean isLocal = (isLocal(from.getDataset()) || isLocal(to.getDataset()));
if (isLocal) {
// copy to avoid making changes to the caller's configuration
Configuration conf = new Configuration(getConf());
conf.set("mapreduce.framework.name", "local");
setConf(conf);
}
if (isHive(from) || isHive(to)) {
setConf(addHiveDelegationToken(getConf()));
// add jars needed for metastore interaction to the classpath
if (!isLocal) {
Class<?> fb303Class, thriftClass;
try {
// attempt to use libfb303 and libthrift 0.9.2 when async was added
fb303Class = Class.forName(
"com.facebook.fb303.FacebookService.AsyncProcessor");
thriftClass = Class.forName(
"org.apache.thrift.TBaseAsyncProcessor");
} catch (ClassNotFoundException e) {
try {
// fallback to 0.9.0 or earlier
fb303Class = Class.forName(
"com.facebook.fb303.FacebookBase");
thriftClass = Class.forName(
"org.apache.thrift.TBase");
} catch (ClassNotFoundException real) {
throw new DatasetOperationException(
"Cannot find thrift dependencies", real);
}
}
TaskUtil.configure(getConf())
.addJarForClass(Encoder.class) // commons-codec
.addJarForClass(Log.class) // commons-logging
.addJarForClass(CompressorInputStream.class) // commons-compress
.addJarForClass(ApiAdapter.class) // datanucleus-core
.addJarForClass(JDOAdapter.class) // datanucleus-api-jdo
.addJarForClass(SQLQuery.class) // datanucleus-rdbms
.addJarForClass(JDOHelper.class) // jdo-api
.addJarForClass(Transaction.class) // jta
.addJarForClass(fb303Class) // libfb303
.addJarForClass(thriftClass) // libthrift
.addJarForClass(HiveMetaStore.class) // hive-metastore
.addJarForClass(HiveConf.class); // hive-exec
}
}
PType<T> toPType = ptype(to);
MapFn<T, T> validate = new CheckEntityClass<T>(to.getType());
Pipeline pipeline = new MRPipeline(getClass(), getConf());
PCollection<T> collection = pipeline.read(CrunchDatasets.asSource(from))
.parallelDo(transform, toPType).parallelDo(validate, toPType);
if (compact) {
// the transform must be run before partitioning
collection = CrunchDatasets.partition(collection, to, numWriters, numPartitionWriters);
}
pipeline.write(collection, CrunchDatasets.asTarget(to), mode);
PipelineResult result = pipeline.done();
StageResult sr = Iterables.getFirst(result.getStageResults(), null);
if (sr != null && MAP_INPUT_RECORDS != null) {
this.count = sr.getCounterValue(MAP_INPUT_RECORDS);
}
return result;
}
示例23
@Test
public void testUseReaderSchema() throws IOException {
// Create a schema with only a username, so we can test reading it
// with an enhanced record structure.
Schema oldRecordSchema = SchemaBuilder.record("org.kitesdk.data.user.OldUserRecord")
.fields()
.requiredString("username")
.endRecord();
// create the dataset
Dataset<Record> in = repo.create("ns", "in", new DatasetDescriptor.Builder()
.schema(oldRecordSchema).build());
Dataset<Record> out = repo.create("ns", "out", new DatasetDescriptor.Builder()
.schema(oldRecordSchema).build());
Record oldUser = new Record(oldRecordSchema);
oldUser.put("username", "user");
DatasetWriter<Record> writer = in.newWriter();
try {
writer.write(oldUser);
} finally {
writer.close();
}
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
// read data from updated dataset that has the new schema.
// At this point, User class has the old schema
PCollection<NewUserRecord> data = pipeline.read(CrunchDatasets.asSource(in.getUri(),
NewUserRecord.class));
PCollection<NewUserRecord> processed = data.parallelDo(new UserRecordIdentityFn(),
Avros.records(NewUserRecord.class));
pipeline.write(processed, CrunchDatasets.asTarget(out));
DatasetReader reader = out.newReader();
Assert.assertTrue("Pipeline failed.", pipeline.run().succeeded());
try {
// there should be one record that is equal to our old user generic record.
Assert.assertEquals(oldUser, reader.next());
Assert.assertFalse(reader.hasNext());
} finally {
reader.close();
}
}
示例24
@Test
public void testUseReaderSchemaParquet() throws IOException {
// Create a schema with only a username, so we can test reading it
// with an enhanced record structure.
Schema oldRecordSchema = SchemaBuilder.record("org.kitesdk.data.user.OldUserRecord")
.fields()
.requiredString("username")
.endRecord();
// create the dataset
Dataset<Record> in = repo.create("ns", "in", new DatasetDescriptor.Builder()
.format(Formats.PARQUET).schema(oldRecordSchema).build());
Dataset<Record> out = repo.create("ns", "out", new DatasetDescriptor.Builder()
.format(Formats.PARQUET).schema(oldRecordSchema).build());
Record oldUser = new Record(oldRecordSchema);
oldUser.put("username", "user");
DatasetWriter<Record> writer = in.newWriter();
try {
writer.write(oldUser);
} finally {
writer.close();
}
Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
// read data from updated dataset that has the new schema.
// At this point, User class has the old schema
PCollection<NewUserRecord> data = pipeline.read(CrunchDatasets.asSource(in.getUri(),
NewUserRecord.class));
PCollection<NewUserRecord> processed = data.parallelDo(new UserRecordIdentityFn(),
Avros.records(NewUserRecord.class));
pipeline.write(processed, CrunchDatasets.asTarget(out));
DatasetReader reader = out.newReader();
Assert.assertTrue("Pipeline failed.", pipeline.run().succeeded());
try {
// there should be one record that is equal to our old user generic record.
Assert.assertEquals(oldUser, reader.next());
Assert.assertFalse(reader.hasNext());
} finally {
reader.close();
}
}