提问者:小点点

如何使用火花卡桑德拉连接器将火花与卡桑德拉连接?


你必须原谅我的笨拙,但我正在尝试建立一个spark集群,连接到运行python脚本的cassandra,目前我正在使用datastax enterprise在solr搜索模式下运行cassandra。我知道,为了使用datastax提供的spark-cassandra连接器,您必须在分析模式下运行cassandra(使用-k选项)。目前,我只让它在dse spark版本上工作,为了让它工作,我遵循了以下步骤:

    < li >在分析模式下启动DSE Cassandra < li >将$PYTHONPATH环境变量更改为/path/to/spark/DSE/python:/path/to/spark/DSE/python/lib/py4j-*。zip:$PYTHONPATH < li >使用< code > python test-script . py 作为root运行独立脚本

此外,我单独使用火花(不是dse版本)进行了另一个测试,试图包含使驱动程序类可访问的java包,我这样做了:

    < li >将SPARK . driver . extra class path =/path/to/SPARK-Cassandra-connector-snapshot . jar添加到文件SPARK-defaults . conf 2 . execute < code > $ SPARK _ HOME/bin/SPARK-submit-packages com . datas tax . SPARK:SPARK-Cassandra...

我还尝试运行pyspark shell并测试sc是否具有cassandraTable方法,以查看驱动程序是否已加载但没有成功,在这两种情况下我都收到以下错误消息:

AttributeError: 'SparkContext' object has no attribute 'cassandraTable'

我的目标是理解我必须做什么来使非dse spark版本与cassandra连接,并使来自驱动程序的方法可用。

我还想知道是否可以将dse spark cassandra连接器与未与dse一起运行的cassandra节点一起使用。

谢谢你的帮助


共2个答案

匿名用户

我在独立的python脚本中使用了pyspark。我不使用DSE,我从datastax的github存储库中克隆了cassandra-spark-connect并使用datastax指令进行编译。

为了访问spark中的spark连接器,我复制到spark安装中的jars文件夹。

我认为这对你也有好处:

 cp ~/spark-cassandra-connector/spark-cassandra-connector/target/full/scala-2.11/spark-cassandra-connector-assembly-2.0.5-86-ge36c048.jar $SPARK_HOME/jars/

你可以参观这个,在那里我解释了我自己设置环境的经历。

一旦 spark 可以访问 Cassandra 连接器,您就可以使用 pyspark 库作为包装器:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession

spark = SparkSession.builder \
  .appName('SparkCassandraApp') \
  .config('spark.cassandra.connection.host', 'localhost') \
  .config('spark.cassandra.connection.port', '9042') \
  .config('spark.cassandra.output.consistency.level','ONE') \
  .master('local[2]') \
  .getOrCreate()

ds = sqlContext \
  .read \
  .format('org.apache.spark.sql.cassandra') \
  .options(table='tablename', keyspace='keyspace_name') \
  .load()

ds.show(10)

在本例中,您可以看到整个脚本。

匿名用户

以下是如何在非dse版本中将spack-shell连接到cassandra。

spark-cassandra连接器jar复制到park/spark-hadoop目录/jars/

spark-shell --jars ~/spark/spark-hadoop-directory/jars/spark-cassandra-connector-*.jar

在Spark shell中执行这些命令

sc.stop
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
import  org.apache.spark.sql.cassandra._
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "localhost")
val sc = new SparkContext(conf)
val csc = new CassandraSQLContext(sc)

如果cassandra有密码设置等,则必须提供更多参数:)