我有一堆CSV文件需要迁移到Elastichsearch,我设法使用Logstash版本7.16.3,索引已经在Elastichsearch上使用适当的Map创建。配置文件如下:
input{
file{
path=> "C:/Users/fr-pa/Documents/wikidata/extracted/*.csv"
start_position => "beginning"
sincedb_path => "NULL"
} } filter{
csv{
separator => ","
columns =>["id", "type", "arlabel", "enlabel","araliases",
"enaliases","ardescription","endescription","maincategory",
"arwiki", "enwiki","arwikiquote", "enwikiquote"]
} } output{
elasticsearch{
hosts=> "http://localhost:9200/"
index => "wikidata_index"
}
stdout {
} }
但数据未迁移,则Logstash:
有人知道问题出在哪里吗?
这是我的索引
索引设置
request_body = {
"settings": {
"analysis": {
"filter": {
"arabic_stop": {
"type": "stop",
"stopwords": "_arabic_"
},
"arabic_keywords": {
"type": "keyword_marker",
"keywords": ["مثال"]
},
"arabic_stemmer": {
"type": "stemmer",
"language": "arabic"
},
"english_stop": {
"type": "stop",
"stopwords": "_english_"
},
"english_keywords": {
"type": "keyword_marker",
"keywords": ["example"]
},
"english_stemmer": {
"type": "stemmer",
"language": "english"
},
"english_possessive_stemmer": {
"type": "stemmer",
"language": "possessive_english"
}
},
"analyzer": {
"rebuilt_arabic": {
"tokenizer": "standard",
"filter": [
"lowercase",
"decimal_digit",
"arabic_stop",
"arabic_normalization",
"arabic_keywords",
"arabic_stemmer"
]
},
"comma_split":{
"type" : "pattern",
"pattern" : ","
},
"rebuilt_english": {
"tokenizer": "standard",
"filter": [
"english_possessive_stemmer",
"lowercase",
"english_stop",
"english_keywords",
"english_stemmer"
]
}
}
}
} ,
"mappings": {
"properties": {
"id": {
"type": "keyword",
"ignore_above": 256
},
"type": {
"type": "text",
"analyzer": "comma_split"
},
"arlabel": {
"type": "text",
"analyzer": "rebuilt_arabic"
},
"enlabel": {
"type": "text",
"analyzer": "rebuilt_english"
},
"araliases": {
"type": "text",
"analyzer": "comma_split"
},
"enaliases": {
"type": "text",
"analyzer": "comma_split"
},
"ardescription":{
"type": "text",
"analyzer": "rebuilt_arabic"
},
"endescription":{
"type": "text",
"analyzer": "rebuilt_english"
},
"maincategory":{
"type": "text",
"analyzer": "comma_split"
},
"arwiki":{
"type": "text",
"analyzer": "rebuilt_arabic"
},
"enwiki":{
"type": "text",
"analyzer": "rebuilt_english"
},
"arwikiquote":{
"type": "text",
"analyzer": "rebuilt_arabic"
},
"enwikiquote": {
"type": "text",
"analyzer": "rebuilt_english"
}
}
}
}
请注意,有些字段包含空值,我尝试使用python Bulk helper类插入数据:
with open(full_path,encoding="utf8") as f:
reader = csv.DictReader(f)
print(reader)
helpers.bulk(es, reader, index='wikidata_index')
引发的错误为:
C:\Users\fr-pa\Documents\wikidata\extracted\till_Q10091689_item.csv
<csv.DictReader object at 0x0000028E86C47EB0>
---------------------------------------------------------------------------
BulkIndexError Traceback (most recent call last)
<ipython-input-42-3849641bd8f9> in <module>
5 reader = csv.DictReader(f)
6 print(reader)
----> 7 helpers.bulk(es, reader, index='wikidata_index')
C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in bulk(client, actions, stats_only, ignore_status, *args, **kwargs)
408 # make streaming_bulk yield successful results so we can count them
409 kwargs["yield_ok"] = True
--> 410 for ok, item in streaming_bulk(
411 client, actions, ignore_status=ignore_status, *args, **kwargs
412 ):
C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in streaming_bulk(client, actions, chunk_size, max_chunk_bytes, raise_on_error, expand_action_callback, raise_on_exception, max_retries, initial_backoff, max_backoff, yield_ok, ignore_status, *args, **kwargs)
327
328 try:
--> 329 for data, (ok, info) in zip(
330 bulk_data,
331 _process_bulk_chunk(
C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in _process_bulk_chunk(client, bulk_actions, bulk_data, raise_on_exception, raise_on_error, ignore_status, *args, **kwargs)
254 raise_on_error=raise_on_error,
255 )
--> 256 for item in gen:
257 yield item
258
C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in _process_bulk_chunk_success(resp, bulk_data, ignore_status, raise_on_error)
185
186 if errors:
--> 187 raise BulkIndexError("%i document(s) failed to index." % len(errors), errors)
188
189
BulkIndexError: ('500 document(s) failed to index.', [{'index': {'_index': 'wikidata_index', '_type': '_doc', '_id': 'dbxzon4BOVq7OZfct2-t', 'status': 400, 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'illegal_argument_exception', 'reason': 'field name cannot be an empty string'}},
2条答案
按热度按时间sdnqo3pr1#
从堆栈跟踪的最后一行可以明显看出这个问题:
批量索引错误:('500个文档无法索引。',[{'index':'类型':"文档“、"标识”:“状态”:400,'错误':{“类型”:“Map器解析异常”,“原因”:'无法剖析','caused_by':{'type':'非法参数异常','原因':'字段名不能为空字符串'}},
您需要删除字段或用空键替换字段,例如以this one的方式。
inkz8wg92#