提问者:小点点

使用scala从EventHub读取数据到DataBricks中的表


我正在尝试从事件中心读取数据到数据库,并想给它一个结构作为col1、col2等。

问题-我看到只有第一条记录以正确的结构出现,并且没有加载其余的数据。

机构24,5300,123456,1,计划-QD,PMT,10/09/15 00:00,1253323,利息,贷款-AS,NULL 32,1300,12458,2,计划,PMT,25/09/15 00:00,12532123,利息,贷款,NULL 36,1400,19458,25,计划,PMTS,25/11/15 00:00,92532163,利息,贷款-DS,NULL

列的标题是(不存在于evnt中心,仅供参考,但在表的输出中,它们应该存在)-

我的代码如下——

导入_

导入org. apache.park.sql.type._

导入org. apache.火花.sql.函数。_

导入_

val ConnectionString=ConnectionStringBuilder("我的连接字符串"). setEventHubName("oth事务").build//要从eventhub读取的此连接字符串

val自定义EventhubParameters=EventHubsConf(连接字符串)

val消费者DF=火花. readStream.format("eventhubs").选项(自定义EventhubParameters.toMap).选项("checkpoint Site","/tmp/checkpoint").load()

VAL OTHDF=ConsumerDF.select($"body"转换为"string")

VAL OTHDF2=OTHDF. with列("temp",拆分(coll("body"), "\,")).select(

(0 until 52).map(i => col("temp").getItem(i).as(s"col$i")): _*

)

OTHDF2. printSchema

OTHDF2. write eStream.format("delta").outputMode("append").option("checkpoint Place","/delta/event/_checkpoints/etl-from-json").table("Table_Name")

我可以得到一些关于如何读取所有这些记录并将其放入数据块中的表中的建议吗?

提前感谢!!


共1个答案

匿名用户

您正在使用“\,”而不是“,”来拆分主体列,请尝试一下。

val OTHDF2 = OTHDF.withColumn("temp", split(col("body"), ","))
               .select($"_tmp".getItem(0).as("id")
                      ,$"_tmp".getItem(1).as("Bal")
                      ,$"_tmp".getItem(2).as("accnum")
                      ,$"_tmp".getItem(3).as("active")
                      ,$"_tmp".getItem(4).as("plan")
                      ,$"_tmp".getItem(5).as("Status")
                      ,$"_tmp".getItem(6).cast("timestamp").as("DateTime")
                      ,$"_tmp".getItem(7).as("Type")
                      ,$"_tmp".getItem(8).as("Loan")
                      ,$"_tmp".getItem(9).as("Where")
                     )
               .drop("_tmp")
               .writeStream
               .format("csv")
               .outputMode("append")
               .option("checkpointLocation", "/FileStore/checkpointLocation.csv")
               .option("path", "/FileStore/data.csv")
               .start()