使用golang和gocqlx在cassandra中进行多分区批处理

2w3kk1z5  于 2021-06-15  发布在  Cassandra
关注(0)|答案(0)|浏览(333)

我正在尝试使用golang和gocqlx在cassandra中执行多分区批处理。
我的问题是我不能让我的批处理语句正常工作。
我的目标如下:

1. Perform an INSERT to second_table.
2. Perform an UPDATE to first_table.
3. Batch the above 2 operations because they need to be all-or-nothing.

我在cassandra中有以下表格结构:

keyspace: mykeyspace

first_table:
    uuid
    second_table_uuid  <-- serves as a link to second_table
    create_timestamp
    update_timestamp

second_table:
    uuid
    first_table_uuid   <-- serves as a link to first_table
    create_timestamp
    update_timestamp

下面是我的代码:

func InsertToSecondTable(payload payload.SecondTable, session *gocql.Session) (string, error) {
    // Make sure payload UUID exists on `first_table`.
    firstTableSelectStmt, firstTableSelectNames := qb.Select("mykeyspace.first_table").
        Where(qb.Eq("uuid")).
        ToCql()

    var firstTableRow table.FirstTable

    q := gocqlx.Query(session.Query(firstTableSelectStmt), firstTableSelectNames).BindMap(qb.M{
        "uuid": payload.UUID,
    })

    // Will return a `not found` error if no matches are found.
    if err := q.GetRelease(&firstTableRow); err != nil {
        return "", err
    }

    // Prepare INSERT to `second_table` query
    now := time.Now()

    uuid, _ := gocql.RandomUUID()   

    var secondTableRow table.SecondTable
    secondTableRow.UUID = uuid
    secondTableRow.CreateTimestamp = now
    secondTableRow.UpdateTimestamp = now

    // Handle UUIDs.
    secondTableRow.FirstTableUUID, _ = getUUIDFromString(payload.UUID)

    secondTableInsertStmt, secondTableInsertNames := qb.Insert("mykeyspace.second_table").
        Columns("uuid", "first_table_uuid", "create_timestamp", "update_timestamp").
        ToCql()

    // Prepare UPDATE on `first_table` query.
    firstTableUpdateStmt, firstTableUpdateNames := qb.Update("mykeyspace.first_table").
        Set("second_table_uuid", "update_timestamp").
        Where(qb.Eq("uuid")).
        ToCql()

    // Start a Batch.
    // This is because we want the INSERT and UPDATE to be all-or-nothing.
    finalStmt, finalNames := qb.Batch().AddStmt(secondTableInsertStmt, secondTableInsertNames).
        AddStmt(firstTableUpdateStmt, firstTableUpdateNames).
        ToCql()

    // Had to change the anonymous struct field names so they don't conflict.
    // Use DB tags to match the INSERT and UPDATE statements.
    batchStruct := struct {
        // second_table
        SecondTableUUIDPK          gocql.UUID `db:"uuid"`
        FirstTableUUID             gocql.UUID `db:"first_table_uuid"`       
        SecondTableCreateTimestamp time.Time  `db:"create_timestamp"`
        SecondTableUpdateTimestamp time.Time  `db:"update_timestamp"`
        // first_table
        FirstTableUUIDPK    gocql.UUID `db:"uuid"`  
        SecondTableUUID     gocql.UUID `db:"second_table_uuid"`
        FirstTableTimestamp time.Time  `db:"create_timestamp"`
        FirstTableTimestamp time.Time  `db:"update_timestamp"`
    }{
        // second_table
        SecondTableUUIDPK:          secondTableRow.UUID,
        FirstTableUUID:             secondTableRow.FirstTableUUID,      
        SecondTableCreateTimestamp: secondTableRow.CreateTimestamp,
        SecondTableUpdateTimestamp: secondTableRow.UpdateTimestamp,
        // first_table
        MainUUIDPK:          firstTableRow.UUID,        
        AdditionalDataUUID:  firstTableRow.SecondTableUUID,
        MainCreateTimestamp: firstTableRow.CreateTimestamp,
        MainUpdateTimestamp: firstTableRow.UpdateTimestamp,
    }

    err = gocqlx.Query(session.Query(finalStmt), finalNames).
        BindStruct(&batchStruct).
        ExecRelease()
    if err != nil {
        return "", err
    }

    return uuid.String(), nil
}

这可能是非常明显的事情,但遗憾的是,我对Cassandra和戈克勒克斯还不熟悉。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题