你必须原谅我的笨拙,但我正在尝试建立一个spark集群,连接到运行python脚本的cassandra,目前我正在使用datastax enterprise在solr搜索模式下运行cassandra。我知道,为了使用datastax提供的spark-cassandra连接器,您必须在分析模式下运行cassandra(使用-k选项)。目前,我只让它在dse spark版本上工作,为了让它工作,我遵循了以下步骤:
此外,我单独使用火花(不是dse版本)进行了另一个测试,试图包含使驱动程序类可访问的java包,我这样做了:
我还尝试运行pyspark shell并测试sc是否具有cassandraTable方法,以查看驱动程序是否已加载但没有成功,在这两种情况下我都收到以下错误消息:
AttributeError: 'SparkContext' object has no attribute 'cassandraTable'
我的目标是理解我必须做什么来使非dse spark版本与cassandra连接,并使来自驱动程序的方法可用。
我还想知道是否可以将dse spark cassandra连接器与未与dse一起运行的cassandra节点一起使用。
谢谢你的帮助
我在独立的python脚本中使用了pyspark。我不使用DSE,我从datastax的github存储库中克隆了cassandra-spark-connect并使用datastax指令进行编译。
为了访问spark中的spark连接器,我复制到spark安装中的jars文件夹。
我认为这对你也有好处:
cp ~/spark-cassandra-connector/spark-cassandra-connector/target/full/scala-2.11/spark-cassandra-connector-assembly-2.0.5-86-ge36c048.jar $SPARK_HOME/jars/
你可以参观这个,在那里我解释了我自己设置环境的经历。
一旦 spark 可以访问 Cassandra 连接器,您就可以使用 pyspark 库作为包装器:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
spark = SparkSession.builder \
.appName('SparkCassandraApp') \
.config('spark.cassandra.connection.host', 'localhost') \
.config('spark.cassandra.connection.port', '9042') \
.config('spark.cassandra.output.consistency.level','ONE') \
.master('local[2]') \
.getOrCreate()
ds = sqlContext \
.read \
.format('org.apache.spark.sql.cassandra') \
.options(table='tablename', keyspace='keyspace_name') \
.load()
ds.show(10)
在本例中,您可以看到整个脚本。
以下是如何在非dse版本中将spack-shell连接到cassandra。
将spark-cassandra连接器
jar复制到park/spark-hadoop目录/jars/
spark-shell --jars ~/spark/spark-hadoop-directory/jars/spark-cassandra-connector-*.jar
在Spark shell中执行这些命令
sc.stop
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
import org.apache.spark.sql.cassandra._
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "localhost")
val sc = new SparkContext(conf)
val csc = new CassandraSQLContext(sc)
如果cassandra有密码设置等,则必须提供更多参数:)