canal MySQL同步到ES7.9.2,当配置pk属性,不配置_id属性,数据不能同步到ES当中

yqkkidmi  于 2022-10-19  发布在  Mysql
关注(0)|答案(1)|浏览(297)
  • I have searched the issues of this repository and believe that this is not a duplicate.
  • I have checked the FAQ of this repository and believe that this is not a duplicate.

environment

  • canal version:1.1.5-SNAPSHOT
  • mysql version:8.0.20
  • ES version:7.9.2

*canal-adapter version 1.1.5-SNAPSHOT

Issue Description

当配置如下信息的时候,朝向数据库当中的gsms_carrier表插入一条数据,但是并没有同步到ES当中去

//配置信息如下
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: gsms_carrier
  pk: id //按照官网指示,不配置_id,需要配置pk
  sql: "select id,name,sort,showed from gsms_carrier"
  etlCondition: ""
  commitBatch: 3000

出现的日志:

2020-11-02 16:49:45.203 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":7,"name":"联通5G","sort":0,"showed":1}],"database":"clues","destination":"example","es":1604306984000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"gsms_carrier","ts":1604306985001,"type":"INSERT"} 
Affected indexes: gsms_carrier

Steps to reproduce

1.启动canal canal-adapter
2.朝向数据库的gsms_carrier表中插入一条语句

Expected behaviour

ES当中有这条语句被同步过去

Actual behaviour

但是没有,很奇怪啊,看到日志已经打印了这条成功的的信息,说明解析正确了,但是为什么没有插入到ES当中去

js4nwp54

js4nwp541#

跟踪了下源码,发现个问题:

ES7xTemplate的insert方法
public void insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
        if (mapping.get_id() != null) {
            ......
            这段分支代码是处理配置_id属性的,下面这个else才是处理没有配置_id,配置了pk属性的
        } else {
    //以pk配置的属性从ES当中查询一下
            ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index())
                .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
                .size(10000);
            SearchResponse response = esSearchRequest.getResponse();
            //获取的结果放到更新对象里面
            for (SearchHit hit : response.getHits()) {
                ESUpdateRequest esUpdateRequest = this.esConnection.new ES7xUpdateRequest(mapping.get_index(),
                    hit.getId()).setDoc(esFieldData);
                getBulk().add(esUpdateRequest);
                commitBulk();
            }
            //没有处理没有查询到的时候,新增的数据情况!
        }
    }

看到上面的代码的else分支的意思,大概是:先以配置的pk的属性从ES当中找一遍,如果有就更新!但是问题是,如果没有尼,
本身就是插入的数据,没有的概率更大,当然也得处理已经存在,并且更新的概率!

相关问题