我使用Spark Dataframe API从NFS共享加载/读取文件,然后将该文件的数据保存/写入HDFS。
import java.io.File
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
object TestMoreThan1CSV2DF {
private val source: String = "file:///data/files/"
private val destination = "hdfs://<myHostIP>:8020/raw/"
private val fileFormat : String = "com.databricks.spark.csv"
def main(args:Array[String]):Unit={
val conf = new SparkConf().setAppName("TestMoreThan1CSV2DF").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val fileArray: Array[File] = new java.io.File(source).listFiles.filter(_.getName.endsWith(".csv"))
for(file<-fileArray){
// reading csv file from shared location and taking whole data in a dataframe
var df = loadCSV2DF(sqlContext, fileFormat, "true", "true", file.getName)
// variable for holding destination location : HDFS Location
var finalDestination: String = destination+file.getName
// saving data into HDFS
writeDF2HDFS(df,fileFormat,"true",finalDestination) /// saved using default number of partition = 1
}
}
def loadCSV2DF(sqlContext : SQLContext, fileFormat: String, header : String, inferSchema: String, source: String) : DataFrame = {
try{
sqlContext.read.format(fileFormat)
.option("header", header) // Use first line of all files as header
.option("inferSchema", inferSchema) // Automatically infer data types
.load(source)
}
catch{
case ex: OnboardingException => {
throw ex;
}
}
}
def writeDF2HDFS(df: DataFrame, fileFormat: String, header: String, destination: String, partitions: Integer = 1){
try{
df.repartition(partitions).write.format(fileFormat).option("header",header).save(destination)
}
catch{
Case ez : OnboardingException => {
throw ez;
}
}
}
}
1)整个代码在哪里运行?它是在驱动程序上运行?还是同时使用两个worker?
2)load()和save()API是否在worker节点上运行,它是否并行工作?如果是,那么两个worker如何跟踪while的读写部分?
3)到现在为止,我在“for”循环中顺序地读取每个文件,并顺序地处理每个文件,有没有可能使它成为一个多线程应用程序,其中每个文件被分配给一个线程,以并行地执行端到端的读写。在这样做的时候,磁盘IO会有任何限制吗?
从另一个线程复制了一个很好的解释,用于我的查询:区分Apache Spark中的驱动程序代码和工作代码
在这里也复制它的一部分:转换创建的闭包中发生的所有事情都发生在worker上。它的意思是,如果在map(...)、filter(...)、mapPartitions(...)、groupby*(...)、aggregateby*(...)中传递了一些内容对工人执行死刑。它包括从持久存储或远程源读取数据。
像计数、减少(...)、折叠(...)这样的操作通常在司机和工人身上执行。沉重的起重是由工人并行执行的,一些最后的步骤,如减少从工人那里收到的输出,是在司机上依次执行的。
3)还在努力做到这一点,一旦做到就会回答这一点。
谢谢