提问者:小点点

通过Google Cloud Dataflow创建/写入经过调理的BigQuery表


我想利用时间分区表的新BigQuery功能,但我不确定这在1.6版本的数据流SDK中是否可行。

查看BigQueryJSONAPI,要创建天分区表,需要传入

"timePartitioning": { "type": "DAY" }

选项,但com.google.cloud.dataflow.sdk.io. BigQueryIO接口只允许指定TableAccess。

我想也许我可以预创建表,并通过BigQueryIO. Write.toTable参考lambda偷偷进入分区装饰器。?还有人通过数据流成功创建/写入分区表吗?

这似乎与设置当前也不可用的表过期时间类似。


共3个答案

匿名用户

正如Pavan所说,使用Dataflow写入分区表绝对是可能的。您使用的是DataflowPipelineRunner以流模式还是批处理模式运行?

您提出的解决方案应该可以工作。具体来说,如果您预先创建了一个设置了日期分区的表,那么您可以使用BigQueryIO. Write.toTableAccesslambda写入日期分区。例如:

/**
 * A Joda-time formatter that prints a date in format like {@code "20160101"}.
 * Threadsafe.
 */
private static final DateTimeFormatter FORMATTER =
    DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC);

// This code generates a valid BigQuery partition name:
Instant instant = Instant.now(); // any Joda instant in a reasonable time range
String baseTableName = "project:dataset.table"; // a valid BigQuery table name
String partitionName =
    String.format("%s$%s", baseTableName, FORMATTER.print(instant));

匿名用户

我采用的方法(也适用于流式模式):

>

  • 为传入的记录定义一个自定义窗口
  • 将窗口转换为表/分区名

    p.apply(PubsubIO.Read
                .subscription(subscription)
                .withCoder(TableRowJsonCoder.of())
            )
            .apply(Window.into(new TablePartitionWindowFn()) )
            .apply(BigQueryIO.Write
                           .to(new DayPartitionFunc(dataset, table))
                           .withSchema(schema)
                           .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            );
    

    根据传入数据设置窗口,可以忽略End Instant,因为start值用于设置分区:

    public class TablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
    
    private IntervalWindow assignWindow(AssignContext context) {
        TableRow source = (TableRow) context.element();
        String dttm_str = (String) source.get("DTTM");
    
        DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC();
    
        Instant start_point = Instant.parse(dttm_str,formatter);
        Instant end_point = start_point.withDurationAdded(1000, 1);
    
        return new IntervalWindow(start_point, end_point);
    };
    
    @Override
    public Coder<IntervalWindow> windowCoder() {
        return IntervalWindow.getCoder();
    }
    
    @Override
    public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
        return Arrays.asList(assignWindow(c));
    }
    
    @Override
    public boolean isCompatible(WindowFn<?, ?> other) {
        return false;
    }
    
    @Override
    public IntervalWindow getSideInputWindow(BoundedWindow window) {
        if (window instanceof GlobalWindow) {
            throw new IllegalArgumentException(
                    "Attempted to get side input window for GlobalWindow from non-global WindowFn");
        }
        return null;
    }
    

    动态设置表分区:

    public class DayPartitionFunc implements SerializableFunction<BoundedWindow, String> {
    
    String destination = "";
    
    public DayPartitionFunc(String dataset, String table) {
        this.destination = dataset + "." + table+ "$";
    }
    
    @Override
    public String apply(BoundedWindow boundedWindow) {
        // The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
        String dayString = DateTimeFormat.forPattern("yyyyMMdd")
                                         .withZone(DateTimeZone.UTC)
                                         .print(((IntervalWindow) boundedWindow).start());
        return destination + dayString;
    }}
    

    有没有更好的方法来实现同样的结果?

  • 匿名用户

    我相信当您不使用流式传输时,应该可以使用分区装饰器。我们正在积极努力通过流式传输支持分区装饰器。如果您今天在非流式传输模式下看到任何错误,请告诉我们。