我有一个大约5 M条目的切片(为了简单起见,假设每个条目都是一个字节切片,使用getIndexerItem
函数Map到索引器项),我将其平均分配给200个go例程。每个go例程然后调用长度为5 M/200的切片的push
函数。
根据我对Refresh: wait_for
的理解,每当向弹性请求时,只有当该请求所做的更改对搜索可见时(IMO翻译为批量请求队列不再有此特定请求),它才会完成。
error indexing item: es_rejected_execution_exception:
rejected execution of processing of [358323543][indices:data/write/bulk[s][p]]:
request: BulkShardRequest [[ankit-test][3]] containing [3424] requests blocking until refresh,
target allocation id: someId, primary term: 1 on EsThreadPoolExecutor
[
name = machine_name/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@1f483ca1
[Running, pool size = 32, active threads = 32, queued tasks = 200, completed tasks = 44390708]
]
字符串
所有的条目都指向同一个索引ankit-test
。
func (e *esClient) getIndexerItem(index string, id string, body []byte) esutil.BulkIndexerItem {
return esutil.BulkIndexerItem{
Index: index,
DocumentID: id,
Body: bytes.NewReader(body),
Action: "index",
DocumentType: "logs",
OnFailure: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
fmt.Printf("error indexing item: %s\n", err.Error())
} else {
fmt.Printf("error indexing item: %s: %s\n", res.Error.Type, res.Error.Reason)
}
},
}
}
func (e *esClient) push(data []esutil.BulkIndexerItem) (*esutil.BulkIndexerStats, error) {
indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: e.client,
Refresh: "wait_for",
NumWorkers: 1,
OnError: func(ctx context.Context, err error) {
fmt.Printf("received onError %s\n", err.Error())
},
})
if err != nil {
return nil, fmt.Errorf("error creating bulk indexer: %s", err)
}
ctx := context.Background()
for _, d := range data {
if err := indexer.Add(ctx, d); err != nil {
fmt.Printf("error adding data to indexer: %s\n", err)
}
}
if err := indexer.Close(ctx); err != nil {
fmt.Printf("error flushing and closing indexer: %s\n", err)
}
indexerStats := indexer.Stats()
return &indexerStats, nil
}
型
假设没有其他进程以任何方式与索引交互。
1条答案
按热度按时间thtygnil1#
使用多个ES文档,我能够找到解决上述问题的方法。以下答案是基于我的理解。如果您发现可以改进/纠正的地方,请发表评论。
下面是请求生命周期:
1.当批量请求到达集群中的节点(也称为协调节点)时,它将被整体放入批量队列,并由批量线程池中的线程进行处理。
1.协调节点根据文档需要路由到的碎片来拆分批量请求。每个批量子请求都被转发到保存相应主碎片的数据节点。批量子请求在该节点的批量队列中排队。如果队列中没有可用空间,则通知协调节点批量子请求已被拒绝。
1.一旦所有子请求都已完成或被拒绝,就会创建一个响应并将其返回给客户端。有可能(甚至很可能)批量请求中只有一部分文档被拒绝。
我的问题是我用
refresh = false
(默认)发送请求。而应该使用refresh = wait_for
。为什么?刷新提供了3种模式:1.假的:不执行与刷新相关的操作。此请求所做的更改将在请求返回后的某个时间点显示。在收到响应之前,请求不必已完成。请求可能仍在节点的队列中。
1.真的:在操作发生后立即刷新相关的主碎片和副本碎片。请确保在发回响应之前已完成请求。已从节点的队列中删除请求。
1.等待(_F):等待请求所做的更改通过刷新变为可见,然后再回复。与true不同的是,这不会强制立即刷新,而是等待刷新发生。开销比
refresh = true
低(就服务器负载而言),但仍确保请求已完成,然后才发回响应。请求已从节点队列中删除。所有数据都被重定向到同一个节点,并且由于
refresh = false
,在从队列中清除现有请求之前返回响应,从而导致溢出。