我正在尝试在我的django项目中实现django logpipe。
我的目标是使用kafka(由django logpype间接实现)将rest\u framework.viewsets到数据库的请求组织成一个队列。
使用wemakedjango模板作为我的项目的起点,我成功地安装了“django logpipe”包并使用本教程启动它。迁移
python manage.py migrate logpipe
成功了。
这是我的项目雇佣关系:
bills
config
docker
docs
locale
server
apps
...
bill
migrations
static
__init__.py
admin.py
apps.py
forms.py
managers.py
models.py
permissions.py
router.py
serializers.py
tests.py
urls.py
views.py
...
我已经为django logpipe准备好了系列化的修改。我的下一步:生产者和消费者。
据我所知,根据教程,我可以通过以下方式在apps.py中实现django loppipe的使用者:
# bills/server/apps/bill/apps.py
from logpipe import Consumer, register_consumer
from django.apps import AppConfig
class BillConfig(AppConfig):
name: str
name = 'bill'
# Register consumers with logpipe
@register_consumer
def build_person_consumer():
consumer = Consumer('people')
consumer.register(PersonSerializer)
return consumer
但是,我可以把制片人放在哪里?据我所知,生产者应该向消费者发送一个json数据,消费者应该在适当的数据结构中序列化它,并通过注册的序列化程序进行处理。我可以从django it self和restapi接收数据。
在rest_framework.viewsets中,我可以从api中看到数据的第一个地方是从billviewset中获取序列化程序:
class BillViewSet(viewsets.ModelViewSet):
parser_classes = (MultiPartParser, FormParser,)
queryset = Bill.objects.all()
serializer_class = BillModelSerializer
read_serializer_class = BillReadSerializer
write_serializer_class = BillWriteSerializer
scan_write_serializer_class = BillScanWriteSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly,
EditAuthority,
IsProperStatus]
@action(detail=False, methods=['get', 'post', 'put'])
def image(self, request):
...
def perform_create(self, serializer):
serializer.save(user=self.request.user)
def perform_update(self, serializer):
...
def get_serializer(self, *args,**kwargs) -> serializers.HyperlinkedModelSerializer:
result = None
if self.request.method in ['GET']:
if hasattr(self, 'read_serializer_class'):
kwargs['context'] = self.get_serializer_context()
result = self.read_serializer_class(*args,**kwargs)
elif (self.request.method in ['POST', 'PUT']) and ('data' in kwargs):
if hasattr(self, 'write_serializer_class'):
if isinstance(kwargs['data'], QueryDict):
buf = [k.replace('null', 'None') for k in kwargs['data'].keys()]
buf = [eval(i) for i in buf]
if self.request.method in ['POST']:
p_data = buf[0]['data']['0'] # type : Dict[str, Any] = List[Dict[str, Dict[Union[int, slice], Any]]
elif self.request.method in ['PUT']:
buf = list(buf[0]['data'].values())
p_data = buf[0] # type : Dict[str, Union[int, str]]
elif isinstance(kwargs['data'], Dict):
p_data = kwargs['data']
kwargs['data'] = self.prepare_data(p_data, remove_none=False)
kwargs['context'] = self.get_serializer_context()
result = self.write_serializer_class(*args,**kwargs)
if not result:
result = super().get_serializer(*args,**kwargs)
return result
def get_options(self) -> Tuple[str, Dict[str, List[Dict[str, Union[str, int]]]]]:
...
def prepare_data(self,
data: Dict[str, Union[int, str]],
remove_none: bool = True) -> Dict[str, Union[int, str]]:
if 'action' in data and data['action'] == 'upload':
return data
else:
buf = {} # buf : Dict[str, Union[int, str]]
fields_for_eval = ['supplier',
'nds_tax',
'category',
'status',
'user']
fields_for_float = ['amount', 'amountNDS', 'amountPURE']
for key, value in data.items():
if key in fields_for_eval and isinstance(value, str):
buf[key] = eval(value)
if key in fields_for_float:
buf[key] = float(value if value else 0)
if key in ['scan']:
if (not value) or value in ['None', 'null']:
buf[key] = None
for k, v in buf.items():
if not v and remove_none:
data.pop(k)
else:
data[k] = v
return data
def get_queryset(self):
queryset = Bill.objects.all()
if not self.action == 'destroy':
queryset = self.get_serializer_class().setup_eager_loading(queryset=queryset)
return queryset
class Meta:
datatables_extra_json = ('get_options', )
它在apiview类中的进程“dispatch”方法期间从list、create、update mixins调用。第一个想法是在get\ u serializer方法中实现producer并从中发送数据,但这毫无意义,因为在standart viewset处理过程中数据会重复。
如果不重写标准的viewset mixins或apiview,我就不能使用get\序列化程序-这会破坏逻辑。我就是找不到一个简单的解决办法。我有什么办法?
我想一定有办法使它简单化。如果django logpipe allready在django中高度集成,那么必须有一些选项。
暂无答案!
目前还没有任何答案,快来回答吧!