django logpipe(kafka)在django项目中的实现

wfveoks0  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(511)

我正在尝试在我的django项目中实现django logpipe。
我的目标是使用kafka(由django logpype间接实现)将rest\u framework.viewsets到数据库的请求组织成一个队列。
使用wemakedjango模板作为我的项目的起点,我成功地安装了“django logpipe”包并使用本教程启动它。迁移

python manage.py migrate logpipe

成功了。
这是我的项目雇佣关系:

bills
  config
  docker
  docs
  locale
  server
    apps
      ...
      bill
        migrations
        static
        __init__.py
        admin.py
        apps.py
        forms.py
        managers.py
        models.py
        permissions.py
        router.py
        serializers.py
        tests.py
        urls.py
        views.py
      ...

我已经为django logpipe准备好了系列化的修改。我的下一步:生产者和消费者。
据我所知,根据教程,我可以通过以下方式在apps.py中实现django loppipe的使用者:


# bills/server/apps/bill/apps.py

from logpipe import Consumer, register_consumer
from django.apps import AppConfig

class BillConfig(AppConfig):
     name: str
     name = 'bill'

# Register consumers with logpipe

@register_consumer
def build_person_consumer():
    consumer = Consumer('people')
    consumer.register(PersonSerializer)
    return consumer

但是,我可以把制片人放在哪里?据我所知,生产者应该向消费者发送一个json数据,消费者应该在适当的数据结构中序列化它,并通过注册的序列化程序进行处理。我可以从django it self和restapi接收数据。
在rest_framework.viewsets中,我可以从api中看到数据的第一个地方是从billviewset中获取序列化程序:

class BillViewSet(viewsets.ModelViewSet):
    parser_classes = (MultiPartParser, FormParser,)
    queryset = Bill.objects.all()
    serializer_class = BillModelSerializer
    read_serializer_class = BillReadSerializer
    write_serializer_class = BillWriteSerializer
    scan_write_serializer_class = BillScanWriteSerializer
    permission_classes = [permissions.IsAuthenticatedOrReadOnly,
                        EditAuthority,
                        IsProperStatus]

    @action(detail=False, methods=['get', 'post', 'put'])
    def image(self, request):
        ...

    def perform_create(self, serializer):
        serializer.save(user=self.request.user)

    def perform_update(self, serializer):
        ...

    def get_serializer(self, *args,**kwargs) -> serializers.HyperlinkedModelSerializer:
        result = None
        if self.request.method in ['GET']:
            if hasattr(self, 'read_serializer_class'):
                kwargs['context'] = self.get_serializer_context()
                result = self.read_serializer_class(*args,**kwargs)
        elif (self.request.method in ['POST', 'PUT']) and ('data' in kwargs):
            if hasattr(self, 'write_serializer_class'):
                if isinstance(kwargs['data'], QueryDict):
                        buf = [k.replace('null', 'None') for k in kwargs['data'].keys()]
                        buf = [eval(i) for i in buf]
                        if self.request.method in ['POST']:
                            p_data = buf[0]['data']['0']  # type : Dict[str, Any] = List[Dict[str, Dict[Union[int, slice], Any]]
                        elif self.request.method in ['PUT']:
                            buf = list(buf[0]['data'].values())
                            p_data = buf[0]  # type : Dict[str, Union[int, str]]
                elif isinstance(kwargs['data'], Dict):
                    p_data = kwargs['data']
                kwargs['data'] = self.prepare_data(p_data, remove_none=False)
                kwargs['context'] = self.get_serializer_context()
                result = self.write_serializer_class(*args,**kwargs)
        if not result:
            result = super().get_serializer(*args,**kwargs)
        return result

    def get_options(self) -> Tuple[str, Dict[str, List[Dict[str, Union[str, int]]]]]:
        ...

    def prepare_data(self,
                    data: Dict[str, Union[int, str]],
                    remove_none: bool = True) -> Dict[str, Union[int, str]]:
        if 'action' in data and data['action'] == 'upload':
            return data
        else:
            buf = {}  # buf : Dict[str, Union[int, str]]
            fields_for_eval = ['supplier',
                            'nds_tax',
                            'category',
                            'status',
                            'user']
            fields_for_float = ['amount', 'amountNDS', 'amountPURE']
            for key, value in data.items():
                if key in fields_for_eval and isinstance(value, str):
                    buf[key] = eval(value)
                if key in fields_for_float:
                    buf[key] = float(value if value else 0)
                if key in ['scan']:
                    if (not value) or value in ['None', 'null']:
                        buf[key] = None
            for k, v in buf.items():
                if not v and remove_none:
                    data.pop(k)
                else:
                    data[k] = v
            return data

    def get_queryset(self):
        queryset = Bill.objects.all()
        if not self.action == 'destroy':
            queryset = self.get_serializer_class().setup_eager_loading(queryset=queryset)
        return queryset

    class Meta:
        datatables_extra_json = ('get_options', )

它在apiview类中的进程“dispatch”方法期间从list、create、update mixins调用。第一个想法是在get\ u serializer方法中实现producer并从中发送数据,但这毫无意义,因为在standart viewset处理过程中数据会重复。
如果不重写标准的viewset mixins或apiview,我就不能使用get\序列化程序-这会破坏逻辑。我就是找不到一个简单的解决办法。我有什么办法?
我想一定有办法使它简单化。如果django logpipe allready在django中高度集成,那么必须有一些选项。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题