我有一个Mongo更改流(一个pymongo应用程序),它不断获取集合中的更改。程序收到的这些更改文档被发送到Azure事件中心。Spark笔记本必须在文档进入事件中心时读取它们,并与该集合的火花表进行模式匹配(将文档中的字段与火花表列匹配)。如果文档中的字段少于表中的字段,则必须使用Null添加列。
我正在阅读事件中心的事件,如下所示。
spark.readStream.format("eventhubs").option(**config).load().
正如在留档中所说,原始消息在我正在转换为字符串的数据帧的“正文”列中。现在我已经将Mongo文档作为流数据帧中的JSON字符串。我面临以下问题。
我需要提取mongo文档中的各个字段。这是比较火花表中存在的字段和Mongo文档中不存在的字段所必需的。我看到了一个名为get_json_object的函数。这本质上再次返回一个字符串,我不能单独选择所有列。
如果from_json可用于将JSON字符串转换为结构类型,我不能指定Schema,因为我们有近70个集合(相应数量的火花表),每个集合发送Mongo文档,字段从10到450。
如果我可以将流数据帧中的JSON字符串转换为一个JSON对象,其模式可以由数据帧推断(类似read. json可以做到的),我可以使用SQL*表示来提取单个列,做一些操作
注意:Stram DF不支持从底层rdd单独提取JSON字符串并进行必要的列比较的收集()方法。使用Spark 2.4
下面是我从事件中心读取事件并将其转换为字符串后在笔记本中获得的示例数据。
{
"documentKey": "5ab2cbd747f8b2e33e1f5527",
"collection": "configurations",
"operationType": "replace",
"fullDocument": {
"_id": "5ab2cbd747f8b2e33e1f5527",
"app": "7NOW",
"type": "global",
"version": "1.0",
"country": "US",
"created_date": "2018-02-14T18:34:13.376Z",
"created_by": "Vikram SSS",
"last_modified_date": "2018-07-01T04:00:00.000Z",
"last_modified_by": "Vikram Ganta",
"last_modified_comments": "Added new property in show_banners feature",
"is_active": true,
"configurations": [
{
"feature": "tip",
"properties": [
{
"id": "tip_mode",
"name": "Delivery Tip Mode",
"description": "Tip mode switches the display of tip options between percentage and amount in the customer app",
"options": [
"amount",
"percentage"
],
"default_value": "tip_percentage",
"current_value": "tip_percentage",
"mode": "multiple or single"
},
{
"id": "tip_amount",
"name": "Tip Amounts",
"description": "List of possible tip amount values",
"default_value": 0,
"options": [
{
"display": "No Tip",
"value": 0
}
]
}
]
}
]
}
}
我想分离并取出上面示例中的full_document
。当我使用get_json_object时,我在另一个流式数据帧中获取full_documentJSON字符串,而不是对象。如您所见,full_document中有一些数组类型可以爆炸(留档说流式DF支持爆炸,但还没有尝试过),但也有一些对象(如struct类型)我想提取单个字段。我不能使用SQL'*'表示法,因为get_json_object返回的是字符串而不是对象本身。
令人信服的是,JSON的这种多种多样的模式在明确提及模式的情况下会更好。所以我认为,在传入流的模式非常不同的流式环境中,指定模式总是更好的。所以我继续get_json_object和from_json,并通过文件读取模式。