提问者:小点点

"finish_bundle"方法执行多次:Apache光束,谷歌数据流


我试图创建JSON文件的批次100记录每个使用apache束管道作为谷歌数据流工作。

我正在从BigQuery读取记录,并试图创建JSON文件,每个文件都有100条记录,即batch_size=100

所以我希望7JSON文件被创建时数据流执行读取700记录从BQ,但我看到更多的文件创建和批量大小不是预期的。

我希望"finish_bundle"方法执行一次,但我看到它通过创建记录小于100的JSON批处理文件多次执行。

以下是当前DF执行的日志详细信息

batch id - 1 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093207.json
batch id - 2 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093239.json
batch id - 3 - length of batch (finish_bundle) - 43 - flie name - my_bucket/29_09_2021/jbatch_20210929_093253.json
batch id - 1 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093329.json
batch id - 2 - length of batch (finish_bundle) - 66 - flie name - my_bucket/29_09_2021/jbatch_20210929_093349.json
batch id - 1 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093423.json
batch id - 2 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093454.json
batch id - 3 - length of batch (finish_bundle) - 61 - flie name - my_bucket/29_09_2021/jbatch_20210929_093512.json
batch id - 1 - length of batch (finish_bundle) - 30 - flie name - my_bucket/29_09_2021/jbatch_20210929_093525.json

我希望创建批次的JSON文件与正确的序列和批量大小与100记录每个这样

batch id - 1 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093207.json
batch id - 2 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093239.json
batch id - 3 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093253.json
batch id - 4 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093329.json
batch id - 5 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093349.json
batch id - 6 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093423.json
batch id - 7 - length of batch (finish_bundle)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093454.json

这是创建JSON批处理文件并存储在存储桶中GCS管道代码。

import os
import json
import apache_beam as beam
import logging

from datetime import datetime
from apache_beam.options.pipeline_options import PipelineOptions


class CreateJSONBatch:

    def execute_pipeline(self):
        try:
            query = "SELECT id, name, region, country, language, pin-code  FROM `project.dataset.table` LIMIT 700"

            beam_options = {
                "project": "<project>",
                "region": "<region>",
                "job_name": "create_json_batch",
                "runner": "DataflowRunner",
                "temp_location": f"gs://<bucket>/temp_location/",
                "setup_file": "./setup.py"
            }

            options = PipelineOptions(**beam_options, save_main_session=True)

            with beam.Pipeline(options=options) as pipeline:
                raw_data = (
                        pipeline | 'Read from BQ ' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query,
                                                                                            use_standard_sql=True))
                )
                _ = (raw_data | 'Create JSON batch files ' >> beam.ParDo(CreateBatch()))

        except Exception as e:
            logging.error(f"Exception in execute_pipeline - {str(e)}")


class CreateBatch(beam.DoFn):

    def __init__(self):
        self.project = None
        self.region = None
        self.batch_size = None
        self.data_bucket = None
        self.json_folder = None
        self.batch_id = 0
        self.json_batch = []

    def get_file_name(self):
        try:
            cur_time = datetime.now()
            date_folder = f"{cur_time.strftime('%d_%m_%Y')}"
            file_name = cur_time.strftime('%Y%m%d_%H%M%S')
            file_name = os.path.join(self.data_bucket, self.json_folder, date_folder, f"jbatch_{file_name}.json")
            return file_name    # file_name -my_bucket/folder_to_store_json_files/29_09_2021/jbatch_20210929_060346.json
        except Exception as e:
            logging.error(f"Exception in CreateBatch.get_file_name - {str(e)}")

    def create_json_files(self, json_file):
        try:
            json_file = f"gs://{json_file}"

            beam_options = {
                "project": self.project,
                "region": self.region,
                "runner": "DataflowRunner",
                "temp_location": f"gs://<bucket>/temp_location/",
                "setup_file": "./setup.py"
            }

            options = PipelineOptions(**beam_options, save_main_session=True)

            with beam.Pipeline(options=options) as pipeline_for_json:
                data = (
                        pipeline_for_json
                        | 'Create pcollection' >> beam.Create(self.json_batch)
                        | 'Write Output' >> beam.io.WriteToText(json_file, shard_name_template='')
                )
        except Exception as e:
            logging.error(f"Exception in CreateBatch.create_json_files - {str(e)}")

    def prep_data(self):
        try:
            formatted_json_batch = []

            for x in range(len(self.json_batch)):
                element = self.json_batch[x]

                modified_element = "<logic to modify the element JSON to the needed format>"

                # sample modified element
                # {
                #     "id": "",
                #     "name": "",
                #     "address": {
                #         "region": "",
                #         "country": "",
                #         "language": "",
                #         "pin-code": ""
                #     }
                # }

                formatted_json_batch.append(json.dumps(modified_element))

            return formatted_json_batch
        except Exception as e:
            self.logger.log_n_notify(log_type="error", msg=f"Exception in CreateBatch.prep_data - {str(e)}")

    def process(self, record):
        try:
            self.project = "<project>"
            self.region = "<region>"
            self.batch_size = 100
            self.data_bucket = "my_bucket"
            self.json_folder = "folder_to_store_json_files"

            if len(self.json_batch) < self.batch_size:
                self.json_batch.append(record)
            else:
                self.batch_id = self.batch_id + 1

                file_name = self.get_file_name()

                # prepare for push
                self.json_batch = self.prep_data()

                logging.info(msg=f"batch id - {self.batch_id} - length of batch (process) - {str(len(self.json_batch))} - flie name - {file_name}")

                # write to JSON
                self.create_json_files(file_name)

                self.json_batch = []

                self.json_batch.append(record)
        except Exception as e:
            logging.error(f"Exception in CreateBatch.process - {str(e)}")

    def finish_bundle(self):
        try:
            self.batch_id = self.batch_id + 1
            if len(self.json_batch) > 0:

                file_name = self.get_file_name()

                # prepare for push
                self.json_batch = self.prep_data()

                logging.info(msg=f"batch id - {self.batch_id} - length of batch (finish_bundle) - {str(len(self.json_batch))} - flie name - {file_name}")

                # write to JSON
                self.create_json_files(file_name)
        except Exception as e:
            logging.error(f"Exception in CreateBatch.finish_bundle - {str(e)}")


if __name__ == "__main__":
    create_batch = CreateJSONBatch()
    create_batch.execute_pipeline()

我不知道为什么“finish_bundle”被多次调用?

我的管道代码中的哪些修改会使文件以给定的批量大小创建?

编辑:我尝试使用“DirectRunner”执行相同的程序,它创建了正确数量的文件。


共1个答案

匿名用户

在处理PCollection时,PCollection的元素被分成任意数量的包,然后执行这些包(通常在许多worker上同时执行,但是一个worker也可能顺序接收多个包)。然后为这些单独的包中的每一个调用start_bundlefinish_bundle。查看更多详细信息https://beam.apache.org/documentation/runtime/model/