提问者:小点点

AWS发电机数据库中的BatchWrite跳过某些项目


我正在尝试使用node SDK将Items写入AWS dynamo数据库。我面临的问题是,当我使用线程将批处理项并行写入AWS时,有些项并没有写入数据库。写入的项目数量是随机的。例如,如果我运行代码3次,一次是150次,下一次是200次,第三次可能是135次。此外,当我在没有线程的情况下按顺序写入项目时,即使有些项目没有被写入。然而,在这种情况下,缺少的项目较少。例如,如果项目总数为300,则写入的项目为298。我调查了该问题,以查看是否有未处理的项目,但batchWrite方法不返回任何内容。这意味着所有项目都被正确处理。请注意,我为各自的数据库提供了OnDemand,因此我不希望出现任何节流问题。这是我的代码。

 exports.run = async function() {

  **This is the function which runs first !!!!!**

  const data = await getArrayOfObjects();
  console.log("TOTAL PRICE CHANGES")  
  console.log(data.length)
  const batchesOfData = makeBatches(data)
  const threads = new Set();
  console.log("**********")
  console.log(batchesOfData.length)
  console.log("**********")
  for(let i = 0; i < batchesOfData.length; i++) {
    console.log("BATCH!!!!!")
    console.log(i)
    console.log(batchesOfData[i].length)  
    // Sequential Approach
    const response = await compensationHelper.createItems(batchesOfData[i])
    console.log("RESPONSE")
    console.log(response)

    Parallel approach
    // const workerResult = await runService(batchesOfData[i])
    // console.log("WORKER RESUULT!!!!")
    // console.log(workerResult);

  }
}

exports.updateItemsInBatch = async function(data, tableName) {
  console.log("WRITING DATA")
  console.log(data.length)
  const batchItems = {
    RequestItems: {},
  };

  batchItems.RequestItems[tableName] = data;
  try {
    const result = await documentClient.batchWrite(batchItems).promise();
    console.log("UNPROCESSED ITEMS")
    console.log(result)
    if (result instanceof Error) {
      console.log(`[Error]: ${JSON.stringify(Error)}`);
      throw new Error(result);
    }
    return Promise.resolve(true);
  } catch (err) {
    console.error(`[Error]: ${JSON.stringify(err.message)}`);
    return Promise.reject(new Error(err));
  }
};

exports.convertToAWSCompatibleFormat = function(data) {
  const awsCompatibleData = [];
  data.forEach(record => awsCompatibleData.push({ PutRequest: { Item: record } }));
  return awsCompatibleData;
};

const createItems = async function(itemList) {
  try {
    const objectsList = [];
    for (let index = 0; index < itemList.length; index++) {
      try {
        const itemListObj = itemList[index];
        const ObjToBeInserted = {
          // some data assignments here
        };

        objectsList.push(ObjToBeInserted);
        if (
          objectsList.length >= AWS_BATCH_SIZE ||
          index === itemList.length - 1
        ) {
            const awsCompatiableFormat = convertToAWSCompatibleFormat(
              objectsList
            );
            await updateItemsInBatch(
              awsCompatiableFormat,
              process.env.myTableName
            );
        }
      } catch (error) {
        console.log(`[Error]: ${JSON.stringify(error)}`);
      }
    }

    return Promise.resolve(true);
  } catch (err) {
    return Promise.reject(new Error(err));
  }
};

const makeBatches = products => {
  const productBatches = [];
  let countr = -1;
  for (let index = 0; index < products.length; index++) {
    if (index % AWS_BATCH_SIZE === 0) {
      countr++;
      productBatches[countr] = [];
      if (countr === MAX_BATCHES) {
        break;
      }
    }
    try {
      productBatches[countr].push(products[index]);
    } catch (error) {
      continue;
    }
  }
  return productBatches;
};

async function runService(workerData) {
  return new Promise((resolve, reject) => {
    const worker = new Worker(path.join(__dirname, './worker.js'), { workerData });
    worker.on('message', resolve);
    worker.on('error', reject);
    worker.on('exit', (code) => {
      if (code !== 0)
        reject(new Error(`Worker stopped with exit code ${code}`));
    })
  })
}

// My worker file
'use strict';

const { workerData, parentPort } = require('worker_threads')
const creatItems = require('myscripts')
// You can do any heavy stuff here, in a synchronous way
// without blocking the "main thread"
console.log("I AM A NEW THREAD")
createItems(workerData)
// console.log('Going to write tons of content on file '+workerData);
parentPort.postMessage({ fileName: workerData, status: 'Done' })

共3个答案

匿名用户

来自boto3文档:

如果以下一项或多项为真,DynamoDB将拒绝整个批处理写入操作:

One or more tables specified in the BatchWriteItem request does not exist.
Primary key attributes specified on an item in the request do not match those in the corresponding table's primary key schema.
You try to perform multiple operations on the same item in the same BatchWriteItem request. For example, you cannot put and delete the same item in the same BatchWriteItem request.
Your request contains at least two items with identical hash and range keys (which essentially is two put operations).
There are more than 25 requests in the batch.
Any individual item in a batch exceeds 400 KB.
The total request size exceeds 16 MB.

对我来说,看起来其中一些是真的。在我的工作中,我们也遇到了一个问题,即一个批次中包含 2 个相同的主键和辅助键,因此整个批次都被丢弃了。我知道这不是节点.js,但我们用它来克服这个问题。

它是batch_writer(overwrite_by_pkeys),用于覆盖批次中同一主键和最后一个键的最后一次出现。如果只有一小部分数据是重复数据,并且不需要保存,则可以使用此选项。但如果您需要保存所有数据,我不建议您使用此功能。

匿名用户

我看不到您在哪里检查UnprocessedItems的响应。批处理操作通常会返回未处理的项目列表。如文档所述,BatchWriteItem“最多可写入16 MB的数据,其中可包含多达25个put或delete请求。”

匿名用户

我有重复的密钥问题,这意味着主密钥和排序密钥在批处理中有重复的值,但是,在我的情况下,如果我的时间戳是秒的分数20200-02-09T08:02:36.71,则AWS BatchWrite方法不会返回此错误,这有点令人惊讶。我通过使createdAt(排序键)更细粒度来解决这个问题,如下所示=