我们正在尝试使用合流模式注册表创建avro记录。我们要发布到集群kafka相同记录。
要将模式id附加到每个记录(魔法字节),我们需要使用--to_avro(列数据,列主题,字符串schemaAuthstryAddress)
为了实现自动化,我们需要在管道中构建项目
现在我们在笔记本电脑中遇到的问题是,我们能够找到一个带有3个参数的方法。
但是,当我们在构建中使用从https://mvnrepository.com/artifact/org.apache.spark/spark-avro_2.12/3.1.2下载的同一个库时,它只有2个重载的方法to_avro
数据库是否有其他maven存储库用于其阴影罐子?
笔记本输出
import org.apache.spark.sql.avro.functions
println(functions.getClass().getProtectionDomain().getCodeSource().getLocation())
// file:/databricks/jars/----workspace_spark_3_1--vendor--avro--avro_2.12_deploy_shaded.jar
functions
.getClass()
.getMethods()
.filter(p=>p.getName.equals("to_avro"))
.foreach(f=>println(f.getName, f.getParameters.mkString("Array(", ", ", ")")))
// (to_avro,Array(final org.apache.spark.sql.Column data, final org.apache.spark.sql.Column subject, final java.lang.String schemaRegistryAddress, final java.lang.String jsonFormatSchema))
// (to_avro,Array(final org.apache.spark.sql.Column data, final org.apache.spark.sql.Column subject, final java.lang.String schemaRegistryAddress))
// (to_avro,Array(final org.apache.spark.sql.Column data, final java.lang.String jsonFormatSchema))
// (to_avro,Array(final org.apache.spark.sql.Column data))
本地输出
import org.apache.spark.sql.avro.functions
println(functions.getClass().getProtectionDomain().getCodeSource().getLocation())
// file:/<home-dir-path>/.gradle/caches/modules-2/files-2.1/org.apache.spark/spark-avro_2.12/3.1.2/1160ae134351328a0ed6a062183faf9a0d5b46ea/spark-avro_2.12-3.1.2.jar
functions
.getClass()
.getMethods()
.filter(p=>p.getName.equals("to_avro"))
.foreach(f=>println(f.getName, f.getParameters.mkString("Array(", ", ", ")")))
// (to_avro,Array(final org.apache.spark.sql.Column data, final java.lang.String jsonFormatSchema))
// (to_avro,Array(final org.apache.spark.sql.Column data))
版本
数据库=
不幸的是,我们没有支持DBR中功能的可共享jar。有一个特性请求将其包含在DBConnect中;但是它没有实现,因为我们没有足够的赞成票来实现该特性。
由于您的用例是自动创建Jar文件,然后将其作为Job提交到Database ricks中,因此我们应该能够使用带有三个参数的to_avro()
函数的虚拟实现创建一个jar存根(dbr-avro-dumy. jar
),并将此jar作为依赖项来欺骗编译器的实际Jar(对于Job)。
这将在构建Jar时避免编译错误,并且在运行时,因为它在Database ricks环境上运行,它将从DBR中选择实际的avroJar
您可以使用下面的包代码构建虚拟Jar存根:(您将使用maven/sbt火花/scala依赖项作为列函数)
package org.apache.spark.sql
import java.net.URL
package object avro {
def from_avro(data: Column, key: String, schemaRegistryURL: URL): Column = {
new Column("dummy")
}
}
不,这些罐子没有发布到任何公共存储库。您可以检查数据库连接
是否提供了这些罐子(您可以使用数据库连接get-jar-dir
获取它们的位置),但我真的对此表示怀疑。
另一种方法是模拟它,例如,创建一个小库,该库将声明一个具有特定签名的函数,并仅将其用于编译,而不是包含到生成的jar中。