提问者:小点点

在批处理管道中,如何将时间戳分配给来自批处理源的数据,例如Beam管道中的csv文件


我正在批处理管道中从有界源(csv文件)读取数据,并希望根据csv文件中作为列存储的数据为元素分配时间戳。如何在Apache Beam管道中执行此操作?


共1个答案

匿名用户

例如,如果您的批处理数据源包含每个元素的基于事件的时间戳,则您有一个单击事件,该事件具有元组<code>{‘timestamp,‘userid‘,‘ClickedSomething‘}</code>。您可以将时间戳分配给管道中的DoFn内的元素。

Java:

public void process(ProcessContext c){
     c.outputWithTimestamp(
         c.element(), 
         new Instant(c.element().getTimestamp()));
}

蟒蛇:

'AddEventTimestamps' >> beam.Map(
            lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))

[编辑Beam指南中的非lambda Python示例:]

class AddTimestampDoFn(beam.DoFn):

  def process(self, element):
    # Extract the numeric Unix seconds-since-epoch timestamp to be
    # associated with the current log entry.
    unix_timestamp = extract_timestamp_from_log_entry(element)
    # Wrap and emit the current entry and new timestamp in a
    # TimestampedValue.
    yield beam.window.TimestampedValue(element, unix_timestamp)

timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())

[根据安东评论编辑]更多信息可以找到@

https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements