根据这个问题---files选项在pyspark不工作sc. addFiles选项应该用于访问驱动程序和执行程序中的文件。但是我不能让它在执行程序上工作
test.py
from pyspark import SparkContext, SparkConf
from pyspark import SparkFiles
conf = SparkConf().setAppName("File access test")
sc = SparkContext(conf=conf)
sc.addFile("file:///home/hadoop/uploads/readme.txt")
with open(SparkFiles.get('readme.txt')) as test_file:
lines = [line.strip() for line in test_file]
print(lines) # this works
print('********************')
lines = sc.textFile(SparkFiles.get('readme.txt')) # run in the executors. this errors
print(lines.collect())
命令
spark-submit --master yarn --deploy-mode client test.py
readme. txt
在主节点的/home/hadoop/上传
下
我在日志中看到以下内容
21/01/27 15:03:30 INFO SparkContext: Added file file:///home/hadoop/uploads/readme.txt at spark://ip-10-133-70-121.sysco.net:44401/files/readme.txt with timestamp 1611759810247
21/01/27 15:03:30 INFO Utils: Copying /home/hadoop/uploads/readme.txt to /mnt/tmp/spark-f929a1e2-e7e8-401e-8e2e-dcd1def3ee7b/userFiles-fed4d5bf-3e31-4e1e-b2ae-3d4782ca265c/readme.txt
所以它将其复制到某个火花目录并挂载(我对火花世界还比较陌生)。如果我使用--files标志并传递文件,它还会将其复制到执行程序可以读取的hdfs://路径。
这是因为addFile要求文件也存在于本地的执行器上。目前readme. txt
在主节点上。如果是这样,有没有办法将其从主节点传播到执行器。
我试图找到一种访问文件的统一方式。我能够将文件从本地机器推送到主节点。然而,在火花代码中,我想要一种访问文件内容的单一方式,无论是驱动程序还是执行程序
目前,为了使代码的执行器部分正常工作,我还必须将文件传递到--files标志中(spack-臣服--master yarn--部署模式客户端--files上传/readme. txttest.py
)并使用如下内容
path = f'hdfs://{sc.getConf().get("spark.driver.host")}:8020/user/hadoop/.sparkStaging/{sc.getConf().get("spark.app.id")}/readme.txt'
lines = sc.textFile(path)
您可以做到这一点的一种方法是将代码文件放在s3存储桶上,然后指向火花提交中的文件位置。在这种情况下,所有工作节点都将从s3获得相同的文件。
确保您的EMR节点有权访问该s3存储桶。
如果您使用的是Jupter Notebook,您可以使用下面的代码片段将工件从本地火花执行器可见路径写入HDFS。
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
filesystem = sc._jvm.org.apache.hadoop.fs.FileSystem
fs = filesystem.get(sc._jsc.hadoopConfiguration())
Path = sc._jvm.org.apache.hadoop.fs.Path
doc_name = "readme.txt"
# Copying Executor -> HDFS
fs.copyFromLocalFile(
False, # Don't delete local file
True, # Overwrite dest file
Path(doc_name), # src
Path(doc_name) # dst
)
print("My HDFS file path is...\n", fs.getWorkingDirectory() + "/" + doc_name, "\n");
然后使用以下CLI命令从HDFS复制到Jupyter服务器可见路径;
%%bash
# Copy HDFS to Local FS
hdfs dfs -copyToLocal -f "hdfs://<name_node>:8020/user/<user>/readme.txt" .
还没有在EMR上测试过这个,但是在本地设置中YARN集群可以正常工作。
您可以使用--档案在驱动程序和执行程序之间共享您的文件。
将您的存档以以下格式保存在s3中。
references.zip
|_file1.txt
|_file2.txt
|_reference.ini
spark-submit --deploy-mode cluster --master yarn --archives s3://bucket/references.zip#references s3://bucket/spark_script.py
在这里使用#引用将解压缩引用/目录下的所有文件。
您可以在执行器/驱动程序中访问如下文件:
with open('references/file1.txt') as f:
data1 = f.read()
和
config = configparser.ConfigParser()
config.read('references/reference.ini')