我使用三角洲湖("io.delta%%"delta-core"%"0.4.0")和合并在foreachBatch喜欢:
foreachBatch { (s, batchid) =>
deltaTable.alias("t")
.merge(
s.as("s"),
"s.eventid = t.eventid and t.categories in ('a1', 'a2')")
.whenMatched("s.eventtime < t.eventtime").updateAll()
.whenNotMatched().insertAll()
.execute()
}
增量表按类别进行分区。如果我在('a1','a2')'中添加分区过滤器,如'和t.类别,从火花图中我可以看到输入不是整个表。我认为它做了分区修剪。然而,如果我这样做:“s. eventid=t.eventid和t.分类=s.分类”,它仍然会加载增量表中的所有数据。我希望它可以自动感知应该去哪些分区进行连接,有点像下推。有没有可能在不指定特定分区值的情况下进行分区修剪?我也尝试添加(“火花.数据库.优化器.动态分区修剪”,“true”)但不起作用。
谢啦
您可以通过两种方式传递它。一种是传递值的静态方式,另一种是您在merge语句中动态设置分区。
val categoriesList = List("a1", "a2")
val catergoryPartitionList = categoriesList.mkString("','")
foreachBatch { (s, batchid) =>
deltaTable.alias("t")
.merge(
s.as("s"),
"s.eventid = t.eventid and t.categories in ('$catergoryPartitionList')")
.whenMatched("s.eventtime < t.eventtime").updateAll()
.whenNotMatched().insertAll()
.execute()
}
val selectedCategories = deltaTable.select("categories").dropDuplicates()
val categoriesList = selectedCategories.map(_.getString(0)).collect()
val catergoryPartitionList = categoriesList.mkString("','")
foreachBatch { (s, batchid) =>
deltaTable.alias("t")
.merge(
s.as("s"),
"s.eventid = t.eventid and t.categories in ('$catergoryPartitionList')")
.whenMatched("s.eventtime < t.eventtime").updateAll()
.whenNotMatched().insertAll()
.execute()
}