提问者:小点点

Spark集群上sqlContext.read...load()和sqlContext.write...save()代码在哪里运行?


我使用Spark Dataframe API从NFS共享加载/读取文件,然后将该文件的数据保存/写入HDFS。

import java.io.File
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext

object TestMoreThan1CSV2DF {
  private val source: String = "file:///data/files/"
  private val destination = "hdfs://<myHostIP>:8020/raw/"
  private val fileFormat : String = "com.databricks.spark.csv"

  def main(args:Array[String]):Unit={
    val conf = new SparkConf().setAppName("TestMoreThan1CSV2DF").setMaster("local")
    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    val fileArray: Array[File] = new java.io.File(source).listFiles.filter(_.getName.endsWith(".csv"))

    for(file<-fileArray){
//  reading csv file from shared location and taking whole data in a dataframe
    var df = loadCSV2DF(sqlContext, fileFormat, "true", "true", file.getName)

//      variable for holding destination location : HDFS Location
    var finalDestination: String = destination+file.getName

//  saving data into HDFS
    writeDF2HDFS(df,fileFormat,"true",finalDestination) /// saved using default number of partition = 1
    }
  }

 def loadCSV2DF(sqlContext : SQLContext, fileFormat: String, header : String, inferSchema: String, source: String) : DataFrame = {
   try{
       sqlContext.read.format(fileFormat)
                       .option("header", header) // Use first line of all files as header
                       .option("inferSchema", inferSchema) // Automatically infer data types
                       .load(source)
   }
   catch{
     case ex: OnboardingException => {
            throw ex;
        }
   }
 }

 def writeDF2HDFS(df: DataFrame, fileFormat: String, header: String, destination: String, partitions: Integer = 1){
   try{
       df.repartition(partitions).write.format(fileFormat).option("header",header).save(destination)
   }
   catch{
     Case ez : OnboardingException => {
            throw ez;
        }
   }
 }
}

1)整个代码在哪里运行?它是在驱动程序上运行?还是同时使用两个worker?

2)load()和save()API是否在worker节点上运行,它是否并行工作?如果是,那么两个worker如何跟踪while的读写部分?

3)到现在为止,我在“for”循环中顺序地读取每个文件,并顺序地处理每个文件,有没有可能使它成为一个多线程应用程序,其中每个文件被分配给一个线程,以并行地执行端到端的读写。在这样做的时候,磁盘IO会有任何限制吗?


共1个答案

匿名用户

从另一个线程复制了一个很好的解释,用于我的查询:区分Apache Spark中的驱动程序代码和工作代码

在这里也复制它的一部分:转换创建的闭包中发生的所有事情都发生在worker上。它的意思是,如果在map(...)、filter(...)、mapPartitions(...)、groupby*(...)、aggregateby*(...)中传递了一些内容对工人执行死刑。它包括从持久存储或远程源读取数据。

像计数、减少(...)、折叠(...)这样的操作通常在司机和工人身上执行。沉重的起重是由工人并行执行的,一些最后的步骤,如减少从工人那里收到的输出,是在司机上依次执行的。

3)还在努力做到这一点,一旦做到就会回答这一点。

谢谢