使用结构化流pyspark中记录的步骤,我能够创建连接但无法读取数据。下面是获取的错误
错误:
Stream stopped...
java.util.concurrent.ExecutionException: com.microsoft.azure.eventhubs.EventHubException
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.spark.eventhubs.client.EventHubsClient.liftedTree1$1(EventHubsClient.scala:187)
at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal$lzycompute(EventHubsClient.scala:184)
at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal(EventHubsClient.scala:183)
at org.apache.spark.eventhubs.client.EventHubsClient.partitionCount(EventHubsClient.scala:176)
at org.apache.spark.sql.eventhubs.EventHubsSource.partitionCount(EventHubsSource.scala:81)
at org.apache.spark.sql.eventhubs.EventHubsSource.$anonfun$maxOffsetsPerTrigger$4(EventHubsSource.scala:96)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.eventhubs.EventHubsSource.$anonfun$maxOffsetsPerTrigger$2(EventHubsSource.scala:96)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.eventhubs.EventHubsSource.<init>(EventHubsSource.scala:96)
at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.createSource(EventHubsSourceProvider.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:326)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$1(MicroBatchExecution.scala:100)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:97)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:95)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:484)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:86)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:484)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:489)
at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren(LogicalPlan.scala:197)
at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren$(LogicalPlan.scala:196)
at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:224)
at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:224)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:489)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:460)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:428)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.planQuery(MicroBatchExecution.scala:95)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:165)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:165)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:349)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)
Caused by: com.microsoft.azure.eventhubs.EventHubException
at com.microsoft.azure.eventhubs.impl.ExceptionUtil.toException(ExceptionUtil.java:74)
at com.microsoft.azure.eventhubs.impl.ReceiveLinkHandler.onLinkFinal(ReceiveLinkHandler.java:81)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:182)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
at com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:784)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
代码:
from pyspark.sql.types import *
import pyspark.sql.functions as F
# Event Hub Namespace Name
NAMESPACE_NAME = "<>"
KEY_NAME = "<>"
KEY_VALUE = "<>"
# The connection string to your Event Hubs Namespace
connectionString = "Endpoint=sb://{0}.servicebus.windows.net/;SharedAccessKeyName={1};SharedAccessKey={2};EntityPath=testing".format(NAMESPACE_NAME, KEY_NAME, KEY_VALUE)
ehConf = {}
# ehConf['eventhubs.connectionString'] = connectionString
# For 2.3.15 version and above, the configuration dictionary requires that connection string be encrypted.
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
read_df = spark.readStream.format("eventhubs").options(**ehConf).load()
# defining the schema
read_schema = StructType([
StructField("Color", StringType(), True),
StructField("Value", StringType(), True)]
)
#capturating in dataframe
decoded_df = read_df.select(F.from_json(F.col("body").cast("string"), read_schema).alias("payload"))
#To view the stream
query1=decoded_df.writeStream.format("json").option("path","/user/sh/JsonStream/").option("checkpointLocation","/user/sh/EventHubCheckPoint/").queryName("read_hub").start()
请帮帮我,哪里出了问题
注意:我已经添加了com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.20,并使用DBR 8.4和火花火花3.1.2和scala2.12
这很可能是由不正确的依赖项引起的-对于DBR 7. x