我想以指定的格式从Eventhub摄取流数据到ADLS gen2。
我做了批量数据摄取,从DB到ADLS和容器到容器,但现在我想尝试流式数据摄取。
你能指导我从哪里开始继续下一步吗?我确实在Azure中创建了Eventhub、Database rick实例和存储帐户。
您只需要遵循EventHubs Spark连接器的留档(对于Scala,对于Python)。代码最简单的方式如下(对于Python):
readConnectionString = "..."
ehConf = {}
# this is required for versions 2.3.15+
ehConf['eventhubs.connectionString']=sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(readConnectionString)
df = spark.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
# casting binary payload to String (but it's really depends on the
# data format inside the topic)
cdf = df.withColumn("body", F.col("body").cast("string"))
# write data to storage
stream = cdf.writeStream.format("delta")\
.outputMode("append")\
.option("checkpointLocation", "/path/to/checkpoint/directory")\
.start("ADLS location")
您可能需要添加其他选项,例如起始位置等,但所有内容都在留档中描述得很好。