提问者:小点点

Spark执行-访问驱动程序和执行程序中文件内容的单一方法


根据这个问题---files选项在pyspark不工作sc. addFiles选项应该用于访问驱动程序和执行程序中的文件。但是我不能让它在执行程序上工作

test.py

from pyspark import SparkContext, SparkConf
from pyspark import SparkFiles

conf = SparkConf().setAppName("File access test")
sc = SparkContext(conf=conf)
sc.addFile("file:///home/hadoop/uploads/readme.txt")

with open(SparkFiles.get('readme.txt')) as test_file:
    lines = [line.strip() for line in test_file]
print(lines) # this works
print('********************')
lines = sc.textFile(SparkFiles.get('readme.txt')) # run in the executors. this errors
print(lines.collect())

命令

spark-submit --master yarn --deploy-mode client test.py

readme. txt在主节点的/home/hadoop/上传

我在日志中看到以下内容

21/01/27 15:03:30 INFO SparkContext: Added file file:///home/hadoop/uploads/readme.txt at spark://ip-10-133-70-121.sysco.net:44401/files/readme.txt with timestamp 1611759810247
21/01/27 15:03:30 INFO Utils: Copying /home/hadoop/uploads/readme.txt to /mnt/tmp/spark-f929a1e2-e7e8-401e-8e2e-dcd1def3ee7b/userFiles-fed4d5bf-3e31-4e1e-b2ae-3d4782ca265c/readme.txt

所以它将其复制到某个火花目录并挂载(我对火花世界还比较陌生)。如果我使用--files标志并传递文件,它还会将其复制到执行程序可以读取的hdfs://路径。

这是因为addFile要求文件也存在于本地的执行器上。目前readme. txt在主节点上。如果是这样,有没有办法将其从主节点传播到执行器。

我试图找到一种访问文件的统一方式。我能够将文件从本地机器推送到主节点。然而,在火花代码中,我想要一种访问文件内容的单一方式,无论是驱动程序还是执行程序

目前,为了使代码的执行器部分正常工作,我还必须将文件传递到--files标志中(spack-臣服--master yarn--部署模式客户端--files上传/readme. txttest.py)并使用如下内容

path = f'hdfs://{sc.getConf().get("spark.driver.host")}:8020/user/hadoop/.sparkStaging/{sc.getConf().get("spark.app.id")}/readme.txt'
lines = sc.textFile(path)

共3个答案

匿名用户

您可以做到这一点的一种方法是将代码文件放在s3存储桶上,然后指向火花提交中的文件位置。在这种情况下,所有工作节点都将从s3获得相同的文件。

确保您的EMR节点有权访问该s3存储桶。

匿名用户

如果您使用的是Jupter Notebook,您可以使用下面的代码片段将工件从本地火花执行器可见路径写入HDFS。

from pyspark import SparkContext 

sc = SparkContext.getOrCreate()
filesystem = sc._jvm.org.apache.hadoop.fs.FileSystem
fs = filesystem.get(sc._jsc.hadoopConfiguration())
Path = sc._jvm.org.apache.hadoop.fs.Path

doc_name = "readme.txt"

# Copying Executor -> HDFS
fs.copyFromLocalFile(
    False, # Don't delete local file
    True,  # Overwrite dest file
    Path(doc_name), # src
    Path(doc_name)  # dst
)
print("My HDFS file path is...\n", fs.getWorkingDirectory() + "/" + doc_name, "\n");

然后使用以下CLI命令从HDFS复制到Jupyter服务器可见路径;

%%bash
# Copy HDFS to Local FS
hdfs dfs -copyToLocal -f "hdfs://<name_node>:8020/user/<user>/readme.txt" .

还没有在EMR上测试过这个,但是在本地设置中YARN集群可以正常工作。

匿名用户

您可以使用--档案在驱动程序和执行程序之间共享您的文件。

将您的存档以以下格式保存在s3中。

references.zip 
 |_file1.txt
 |_file2.txt
 |_reference.ini


spark-submit --deploy-mode cluster --master yarn --archives s3://bucket/references.zip#references s3://bucket/spark_script.py

在这里使用#引用将解压缩引用/目录下的所有文件。

您可以在执行器/驱动程序中访问如下文件:

with open('references/file1.txt') as f:
    data1 = f.read()

config = configparser.ConfigParser()
config.read('references/reference.ini')