我试图创建JSON文件的批次100记录每个使用apache束管道作为谷歌数据流工作。
我正在从BigQuery读取记录,并试图创建JSON文件,每个文件都有100条记录,即batch_size=100
所以我希望7JSON
文件被创建时数据流执行读取700记录
从BQ,但我看到更多的文件创建和批量大小不是预期的。
我希望"finish_bundle"
方法执行一次,但我看到它通过创建记录小于100的JSON批处理文件多次执行。
以下是当前DF执行的日志详细信息
batch id - 1 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093207.json
batch id - 2 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093239.json
batch id - 3 - length of batch (finish_bundle) - 43 - flie name - my_bucket/29_09_2021/jbatch_20210929_093253.json
batch id - 1 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093329.json
batch id - 2 - length of batch (finish_bundle) - 66 - flie name - my_bucket/29_09_2021/jbatch_20210929_093349.json
batch id - 1 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093423.json
batch id - 2 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093454.json
batch id - 3 - length of batch (finish_bundle) - 61 - flie name - my_bucket/29_09_2021/jbatch_20210929_093512.json
batch id - 1 - length of batch (finish_bundle) - 30 - flie name - my_bucket/29_09_2021/jbatch_20210929_093525.json
我希望创建批次的JSON文件与正确的序列和批量大小与100记录每个这样
batch id - 1 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093207.json
batch id - 2 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093239.json
batch id - 3 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093253.json
batch id - 4 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093329.json
batch id - 5 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093349.json
batch id - 6 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093423.json
batch id - 7 - length of batch (finish_bundle)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093454.json
这是创建JSON批处理文件并存储在存储桶中GCS管道代码。
import os
import json
import apache_beam as beam
import logging
from datetime import datetime
from apache_beam.options.pipeline_options import PipelineOptions
class CreateJSONBatch:
def execute_pipeline(self):
try:
query = "SELECT id, name, region, country, language, pin-code FROM `project.dataset.table` LIMIT 700"
beam_options = {
"project": "<project>",
"region": "<region>",
"job_name": "create_json_batch",
"runner": "DataflowRunner",
"temp_location": f"gs://<bucket>/temp_location/",
"setup_file": "./setup.py"
}
options = PipelineOptions(**beam_options, save_main_session=True)
with beam.Pipeline(options=options) as pipeline:
raw_data = (
pipeline | 'Read from BQ ' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query,
use_standard_sql=True))
)
_ = (raw_data | 'Create JSON batch files ' >> beam.ParDo(CreateBatch()))
except Exception as e:
logging.error(f"Exception in execute_pipeline - {str(e)}")
class CreateBatch(beam.DoFn):
def __init__(self):
self.project = None
self.region = None
self.batch_size = None
self.data_bucket = None
self.json_folder = None
self.batch_id = 0
self.json_batch = []
def get_file_name(self):
try:
cur_time = datetime.now()
date_folder = f"{cur_time.strftime('%d_%m_%Y')}"
file_name = cur_time.strftime('%Y%m%d_%H%M%S')
file_name = os.path.join(self.data_bucket, self.json_folder, date_folder, f"jbatch_{file_name}.json")
return file_name # file_name -my_bucket/folder_to_store_json_files/29_09_2021/jbatch_20210929_060346.json
except Exception as e:
logging.error(f"Exception in CreateBatch.get_file_name - {str(e)}")
def create_json_files(self, json_file):
try:
json_file = f"gs://{json_file}"
beam_options = {
"project": self.project,
"region": self.region,
"runner": "DataflowRunner",
"temp_location": f"gs://<bucket>/temp_location/",
"setup_file": "./setup.py"
}
options = PipelineOptions(**beam_options, save_main_session=True)
with beam.Pipeline(options=options) as pipeline_for_json:
data = (
pipeline_for_json
| 'Create pcollection' >> beam.Create(self.json_batch)
| 'Write Output' >> beam.io.WriteToText(json_file, shard_name_template='')
)
except Exception as e:
logging.error(f"Exception in CreateBatch.create_json_files - {str(e)}")
def prep_data(self):
try:
formatted_json_batch = []
for x in range(len(self.json_batch)):
element = self.json_batch[x]
modified_element = "<logic to modify the element JSON to the needed format>"
# sample modified element
# {
# "id": "",
# "name": "",
# "address": {
# "region": "",
# "country": "",
# "language": "",
# "pin-code": ""
# }
# }
formatted_json_batch.append(json.dumps(modified_element))
return formatted_json_batch
except Exception as e:
self.logger.log_n_notify(log_type="error", msg=f"Exception in CreateBatch.prep_data - {str(e)}")
def process(self, record):
try:
self.project = "<project>"
self.region = "<region>"
self.batch_size = 100
self.data_bucket = "my_bucket"
self.json_folder = "folder_to_store_json_files"
if len(self.json_batch) < self.batch_size:
self.json_batch.append(record)
else:
self.batch_id = self.batch_id + 1
file_name = self.get_file_name()
# prepare for push
self.json_batch = self.prep_data()
logging.info(msg=f"batch id - {self.batch_id} - length of batch (process) - {str(len(self.json_batch))} - flie name - {file_name}")
# write to JSON
self.create_json_files(file_name)
self.json_batch = []
self.json_batch.append(record)
except Exception as e:
logging.error(f"Exception in CreateBatch.process - {str(e)}")
def finish_bundle(self):
try:
self.batch_id = self.batch_id + 1
if len(self.json_batch) > 0:
file_name = self.get_file_name()
# prepare for push
self.json_batch = self.prep_data()
logging.info(msg=f"batch id - {self.batch_id} - length of batch (finish_bundle) - {str(len(self.json_batch))} - flie name - {file_name}")
# write to JSON
self.create_json_files(file_name)
except Exception as e:
logging.error(f"Exception in CreateBatch.finish_bundle - {str(e)}")
if __name__ == "__main__":
create_batch = CreateJSONBatch()
create_batch.execute_pipeline()
我不知道为什么“finish_bundle”被多次调用?
我的管道代码中的哪些修改会使文件以给定的批量大小创建?
编辑:我尝试使用“DirectRunner”执行相同的程序,它创建了正确数量的文件。
在处理PCollection时,PCollection的元素被分成任意数量的包,然后执行这些包(通常在许多worker上同时执行,但是一个worker也可能顺序接收多个包)。然后为这些单独的包中的每一个调用start_bundle
和finish_bundle
。查看更多详细信息https://beam.apache.org/documentation/runtime/model/