我已经在apachespark运行的地方建立了一个小规模的hadoopYarn集群。我有一些数据(json,csv)上传到spark(Dataframe)进行分析。稍后,我必须将所有Dataframe数据索引到apachesolr中。我使用的是spark3和solr8.8版本。在我的搜索中,我在这里找到了一个解决方案,但它适用于不同版本的spark。因此,我决定向别人要这个。此任务是否有任何内置选项。我愿意使用solrj和pyspark(不是scal shell)。
56lgkhnf1#
我自己找到了解决办法。到目前为止,lucidword spark solr模块还不支持spark(3.0.2)和solr(8.8)的这些版本。我首先安装了pysolr模块,然后使用以下示例代码完成我的工作:
import pysolrimport jsondef solrIndexer(row): solr = pysolr.Solr('http://localhost:8983/solr/spark-test') obj = json.loads(row) solr.add(obj)# load data to dataframe from HDFScsvDF = spark.read.load("hdfs://hms/data/*.csv", format="csv", sep=",", inferSchema="true", header="true")csvDF.toJSON().map(solrIndexer).collect()
import pysolr
import json
def solrIndexer(row):
solr = pysolr.Solr('http://localhost:8983/solr/spark-test')
obj = json.loads(row)
solr.add(obj)
# load data to dataframe from HDFS
csvDF = spark.read.load("hdfs://hms/data/*.csv", format="csv", sep=",", inferSchema="true", header="true")
csvDF.toJSON().map(solrIndexer).collect()
如果以上代码中有更好的选择或改进,欢迎您回答。
1条答案
按热度按时间56lgkhnf1#
我自己找到了解决办法。到目前为止,lucidword spark solr模块还不支持spark(3.0.2)和solr(8.8)的这些版本。我首先安装了pysolr模块,然后使用以下示例代码完成我的工作:
如果以上代码中有更好的选择或改进,欢迎您回答。