使用golang和gocqlx在cassandra中进行多分区批处理

2w3kk1z5  于 2021-06-15  发布在  Cassandra
关注(0)|答案(0)|浏览(357)

我正在尝试使用golang和gocqlx在cassandra中执行多分区批处理。
我的问题是我不能让我的批处理语句正常工作。
我的目标如下:

  1. 1. Perform an INSERT to second_table.
  2. 2. Perform an UPDATE to first_table.
  3. 3. Batch the above 2 operations because they need to be all-or-nothing.

我在cassandra中有以下表格结构:

  1. keyspace: mykeyspace
  2. first_table:
  3. uuid
  4. second_table_uuid <-- serves as a link to second_table
  5. create_timestamp
  6. update_timestamp
  7. second_table:
  8. uuid
  9. first_table_uuid <-- serves as a link to first_table
  10. create_timestamp
  11. update_timestamp

下面是我的代码:

  1. func InsertToSecondTable(payload payload.SecondTable, session *gocql.Session) (string, error) {
  2. // Make sure payload UUID exists on `first_table`.
  3. firstTableSelectStmt, firstTableSelectNames := qb.Select("mykeyspace.first_table").
  4. Where(qb.Eq("uuid")).
  5. ToCql()
  6. var firstTableRow table.FirstTable
  7. q := gocqlx.Query(session.Query(firstTableSelectStmt), firstTableSelectNames).BindMap(qb.M{
  8. "uuid": payload.UUID,
  9. })
  10. // Will return a `not found` error if no matches are found.
  11. if err := q.GetRelease(&firstTableRow); err != nil {
  12. return "", err
  13. }
  14. // Prepare INSERT to `second_table` query
  15. now := time.Now()
  16. uuid, _ := gocql.RandomUUID()
  17. var secondTableRow table.SecondTable
  18. secondTableRow.UUID = uuid
  19. secondTableRow.CreateTimestamp = now
  20. secondTableRow.UpdateTimestamp = now
  21. // Handle UUIDs.
  22. secondTableRow.FirstTableUUID, _ = getUUIDFromString(payload.UUID)
  23. secondTableInsertStmt, secondTableInsertNames := qb.Insert("mykeyspace.second_table").
  24. Columns("uuid", "first_table_uuid", "create_timestamp", "update_timestamp").
  25. ToCql()
  26. // Prepare UPDATE on `first_table` query.
  27. firstTableUpdateStmt, firstTableUpdateNames := qb.Update("mykeyspace.first_table").
  28. Set("second_table_uuid", "update_timestamp").
  29. Where(qb.Eq("uuid")).
  30. ToCql()
  31. // Start a Batch.
  32. // This is because we want the INSERT and UPDATE to be all-or-nothing.
  33. finalStmt, finalNames := qb.Batch().AddStmt(secondTableInsertStmt, secondTableInsertNames).
  34. AddStmt(firstTableUpdateStmt, firstTableUpdateNames).
  35. ToCql()
  36. // Had to change the anonymous struct field names so they don't conflict.
  37. // Use DB tags to match the INSERT and UPDATE statements.
  38. batchStruct := struct {
  39. // second_table
  40. SecondTableUUIDPK gocql.UUID `db:"uuid"`
  41. FirstTableUUID gocql.UUID `db:"first_table_uuid"`
  42. SecondTableCreateTimestamp time.Time `db:"create_timestamp"`
  43. SecondTableUpdateTimestamp time.Time `db:"update_timestamp"`
  44. // first_table
  45. FirstTableUUIDPK gocql.UUID `db:"uuid"`
  46. SecondTableUUID gocql.UUID `db:"second_table_uuid"`
  47. FirstTableTimestamp time.Time `db:"create_timestamp"`
  48. FirstTableTimestamp time.Time `db:"update_timestamp"`
  49. }{
  50. // second_table
  51. SecondTableUUIDPK: secondTableRow.UUID,
  52. FirstTableUUID: secondTableRow.FirstTableUUID,
  53. SecondTableCreateTimestamp: secondTableRow.CreateTimestamp,
  54. SecondTableUpdateTimestamp: secondTableRow.UpdateTimestamp,
  55. // first_table
  56. MainUUIDPK: firstTableRow.UUID,
  57. AdditionalDataUUID: firstTableRow.SecondTableUUID,
  58. MainCreateTimestamp: firstTableRow.CreateTimestamp,
  59. MainUpdateTimestamp: firstTableRow.UpdateTimestamp,
  60. }
  61. err = gocqlx.Query(session.Query(finalStmt), finalNames).
  62. BindStruct(&batchStruct).
  63. ExecRelease()
  64. if err != nil {
  65. return "", err
  66. }
  67. return uuid.String(), nil
  68. }

这可能是非常明显的事情,但遗憾的是,我对Cassandra和戈克勒克斯还不熟悉。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题