java—如何使用mapreduce批量更新满足查询的数据存储实体?

uxhixvfz  于 2021-06-30  发布在  Java
关注(0)|答案(2)|浏览(454)

我想使用mapreduce库来更新满足查询的所有实体。有几个复杂问题:
查找要更新的实体的查询将检查特定属性“property1”的值是否包含在csv文件中的一长串值(约10000个条目)中
对于满足查询的每个实体,另一个属性“property2”需要更新为等于csv文件第二列和同一行中的值
我知道如何将csv文件上传到blobstore,并使用blobstore输入读取器读取每一行。我还了解使用查询获取实体的数据存储输入读取器。
我的问题是如何创建一个Map器类,从blobstore读取输入数据,获取数据存储实体并尽可能高效地更新它们?

p4tfgftt

p4tfgftt1#

考虑到property1的可能值列表很长,使用查询进行筛选似乎不是一个好的选择(因为您需要使用in-filter,它实际上为每个值运行一个查询)
使用mr的另一种方法是使用Map(从property1到property2)将csv加载到内存中,然后触发一个mr作业,该作业迭代所有实体,如果它们的property1是Map上的键的一部分,则使用Map值修改它。
正如@ryan b所说,如果你只是想利用批量卖出的优势,你不需要使用mr,因为你可以使用 Iterable 使用数据存储服务放置。

9gm1akwq

9gm1akwq2#

您可以使用datastoreinputreader,在map函数中,找出属性1是否真的在csv中:每次从csv中读取都会非常慢,您可以使用memcache在从自己的数据存储模型中读取一次之后提供该信息。为了填充数据存储模型,我建议使用property1作为每一行的自定义id,这样查询它就很简单了。您只需要为那些实际更改的值更新数据存储,并使用变异池使其具有性能(op.db.put())。我给你留下伪代码(对不起。。。我只在python中有它)关于不同部分的外观,我进一步建议您阅读这篇关于google app engine上mapreduce的文章:http://sookocheff.com/posts/2014-04-15-app-engine-mapreduce-api-part-1-the-basics/


# to get the to_dict method

from google.appengine.ext import ndb
from mapreduce import operation as op 
from mapreduce.lib import pipeline
from mapreduce import mapreduce_pipeline

class TouchPipeline(pipeline.Pipeline):
    """
    Pipeline to update the field of entities that have certain condition
    """

    def run(self, *args,**kwargs):
        """ run """
        mapper_params = {
            "entity_kind": "yourDatastoreKind",
        }
        yield mapreduce_pipeline.MapperPipeline(
            "Update entities that have certain condition",
            handler_spec="datastore_map",
            input_reader_spec="mapreduce.input_readers.DatastoreInputReader",
            params=mapper_params,
            shards=64)

class csvrow(ndb.Model):
  #you dont store property 1 because you are going to use its value as key
  substitutefield=ndb.StringProperty()

def create_csv_datastore():
  # instead of running this, make a 10,000 row function with each csv value, 
  # or read it from the blobstore, iterate and update the values accordingly
  for i in range(10000):
    #here we are using our own key as id of this row and just storing the other column that
    #eventually will be subtitute if it matches
    csvrow.get_or_insert('property%s' % i, substitutefield = 'substitute%s').put()

def queryfromcsv(property1):
  csvrow=ndb.Key('csvrow', property1).get()
  if csvrow:
    return csvrow.substitutefield
  else:
    return property1

def property1InCSV(property1):
  data = memcache.get(property1)
  if data is not None:
      return data
  else:
      data = self.queryfromcsv(property1)
      memcache.add(property1, data, 60)
      return data

def datastore_map(entity_type):
  datastorepropertytocheck = entity_type.property1
  newvalue = property1InCSV(datastorepropertytocheck)
  if newvalue!=datastoreproperty:
    entity_type.property11 = newvalue
    #use the mutation pool
    yield op.db.Put(entity)

相关问题