我试图从Cassandra那里读取数据,并将特定索引写入redis。假设redis db 5。
我需要以hashmap格式将所有数据写入redis db index 5。
val spark = SparkSession.builder()
.appName("redis-df")
.master("local[*]")
.config("spark.redis.host", "localhost")
.config("spark.redis.port", "6379")
.config("spark.redis.db", 5)
.config("spark.cassandra.connection.host", "localhost")
.getOrCreate()
import spark.implicits._
val someDF = Seq(
(8, "bat"),
(64, "mouse"),
(-27, "horse")
).toDF("number", "word")
someDF.write
.format("org.apache.spark.sql.redis")
.option("keys.pattern", "*")
//.option("table", "person"). // Is it mandatory ?
.save()
我可以在没有表名的情况下将数据保存到redis吗?实际上我只是想把所有的数据都保存到redis index 5中而不使用表名,这有可能吗?我已经阅读了spark redis连接器的文档,但没有看到任何与此相关的示例。文档链接:https://github.com/redislabs/spark-redis/blob/master/doc/dataframe.md#writing
我目前正在使用这个版本的spark redis连接器
<dependency>
<groupId>com.redislabs</groupId>
<artifactId>spark-redis_2.11</artifactId>
<version>2.5.0</version>
</dependency>
有人面对过这个问题吗?有解决办法吗?
如果我在配置文件中没有提到表名,就会出现错误
失败
java.lang.IllegalArgumentException: Option 'table' is not set.
at org.apache.spark.sql.redis.RedisSourceRelation$$anonfun$tableName$1.apply(RedisSourceRelation.scala:208)
at org.apache.spark.sql.redis.RedisSourceRelation$$anonfun$tableName$1.apply(RedisSourceRelation.scala:208)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.redis.RedisSourceRelation.tableName(RedisSourceRelation.scala:208)
at org.apache.spark.sql.redis.RedisSourceRelation.saveSchema(RedisSourceRelation.scala:245)
at org.apache.spark.sql.redis.RedisSourceRelation.insert(RedisSourceRelation.scala:121)
at org.apache.spark.sql.redis.DefaultSource.createRelation(DefaultSource.scala:30)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
2条答案
按热度按时间bvuwiixz1#
我不同意。我处理的问题和你完全一样。以下是我的发现:
必须引用表或键模式(e、 g.)
df=spark.read.format(“org.apache.spark.sql.redis”)
.option(“keys.pattern”,“rec-*”)
.option(“infer.schema”,true).load()
在我的例子中,我使用的是一个哈希,哈希键都以“rec-”开头,后跟一个int。如前所述,诀窍是如果您想将数据读回spark。它需要一个表名,但似乎使用冒号作为分隔符。因为我想进行读/写操作,所以我只需将表名改为“rec:”,就可以了。
我认为您的困惑源于这样一个事实,即在您的示例中,您在spark中只定义了一个记录。如果你有两个呢?redis需要创建两个不同的键,比如“person:1“或”person:2". 它使用术语表来描述“人”。是钥匙还是table?文件似乎不一致。
我目前的问题是,如何通过改变db context.config(“spark.redis.db”,5)来保存到不同的redis db。当我在df.write.format中使用它时,这似乎对我不起作用。有什么想法吗?
ecbunoof2#
table选项是必需的。其思想是指定表名,这样就可以从提供该表名的redis读回Dataframe。在您的示例中,另一个选项是将Dataframe转换为键/值rdd并使用
sc.toRedisKV(rdd)