如何使用Apache Beam,谷歌数据流和java将具有未知json属性的大型jsonl文件转换为csv
这是我的场景:
任何帮助或指导都会有所帮助,因为我是Apache Beam的新手,尽管我正在阅读Apache Beam的文档。
我已经用示例JSONL数据编辑了问题
{"Name":"Gilbert", "Session":"2013", "Score":"24", "Completed":"true"}
{"Name":"Alexa", "Session":"2013", "Score":"29", "Completed":"true"}
{"Name":"May", "Session":"2012B", "Score":"14", "Completed":"false"}
{"Name":"Deloise", "Session":"2012A", "Score":"19", "Completed":"true"}
虽然json key存在于输入文件中,但在转换时它是未知的。我将通过一个例子来解释这一点,假设我有三个客户端,每个客户端都有自己的google存储空间,因此每个客户端都上传了具有不同json属性的自己的jsonl文件。
客户端1:输入Jsonl文件
{"city":"Mumbai", "pincode":"2012A"}
{"city":"Delhi", "pincode":"2012N"}
客户端2:输入Jsonl文件
{"Relation":"Finance", "Code":"2012A"}
{"Relation":"Production", "Code":"20XXX"}
客户端3:输入Jsonl文件
{"Name":"Gilbert", "Session":"2013", "Score":"24", "Completed":"true"}
{"Name":"Alexa", "Session":"2013", "Score":"29", "Completed":"true"}
问题:我如何编写一个通用的光束管道,它可以将所有三个转换为如下所示
客户端1:输出CSV文件
["city", "pincode"]
["Mumbai","2012A"]
["Delhi", "2012N"]
客户端2:输出CSV文件
["Relation", "Code"]
["Finance", "2012A"]
["Production","20XXX"]
客户端3:输出CSV文件
["Name", "Session", "Score", "true"]
["Gilbert", "2013", "24", "true"]
["Alexa", "2013", "29", "true"]
编辑:删除了之前的问题,因为问题已通过示例进行了修改。
任何人都没有提供通用的方法来实现这样的结果。您必须根据您的需求和处理管道的方式自己编写逻辑。
下面有一些示例,但您需要针对您的情况验证这些示例,因为我只在一个小的JSONL文件上尝试过这些示例。
方法1
如果您可以收集输出csv的标头值,那么它会容易得多。但是事先获取标头本身又是一个挑战。
//pipeline
pipeline.apply("ReadJSONLines",
TextIO.read().from("FILE URL"))
.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processLines(@Element String line, OutputReceiver<String> receiver) {
String values = getCsvLine(line, false);
receiver.output(values);
}
}))
.apply("WriteCSV",
TextIO.write().to("FileName")
.withSuffix(".csv")
.withoutSharding()
.withDelimiter(new char[] { '\r', '\n' })
.withHeader(getHeader()));
private static String getHeader() {
String header = "";
//your logic to get the header line.
return header;
}
获取标题行的可能方法(仅假设在您的情况下可能不起作用):
方法2
这是我为小JsonFiles(~10k行)找到的解决方法。下面的示例可能不适用于大文件。
final int[] count = { 0 };
pipeline.apply(//read file)
.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processLines(@Element String line, OutputReceiver<String> receiver) {
// check if its the first processing element. If yes then create the header
if (count[0] == 0) {
String header = getCsvLine(line, true);
receiver.output(header);
count[0]++;
}
String values = getCsvLine(line, false);
receiver.output(values);
}
}))
.apply(//write file)
正如Saransh在使用FileIO的评论中提到的,您所要做的就是手动逐行读取JSONL,然后将其转换为逗号分隔的format.EG:
pipeline.apply(FileIO.match().filepattern("FILE PATH"))
.apply(FileIO.readMatches())
.apply(FlatMapElements
.into(TypeDescriptors.strings())
.via((FileIO.ReadableFile f) -> {
List<String> output = new ArrayList<>();
try (BufferedReader br = new BufferedReader(Channels.newReader(f.open(), "UTF-8"))) {
String line = br.readLine();
while (line != null) {
if (output.size() == 0) {
String header = getCsvLine(line, true);
output.add(header);
}
String result = getCsvLine(line, false);
output.add(result);
line = br.readLine();
}
} catch (IOException e) {
throw new RuntimeException("Error while reading", e);
}
return output;
}))
.apply(//write to gcs)
在上面的例子中,我使用了一个getCsvLine
方法(为代码可用性而创建),它从文件中获取一行并将其转换为逗号分隔的format.To解析我使用GSON的JSON对象。
/**
* @param line take each JSONL line
* @param isHeader true : Returns output combining the JSON keys || false:
* Returns output combining the JSON values
**/
public static String getCsvLine(String line, boolean isHeader) {
List<String> values = new ArrayList<>();
// convert the line into jsonobject
JsonObject jsonObject = JsonParser.parseString(line).getAsJsonObject();
// iterate json object and collect all values
for (Map.Entry<String, JsonElement> entry : jsonObject.entrySet()) {
if (isHeader)
values.add(entry.getKey());
else
values.add(entry.getValue().getAsString());
}
String result = String.join(",", values);
return result;
}