kafkaconsumer代码, Package 在uwsgi中,运行在docker中,看起来什么也没做

2cmtqfgy  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(399)

我试图在uwsgi和docker容器中运行kafkaconsumer代码。
该代码在docker/uwsgi之外工作,但是一旦在docker中启动,uwsgi就不会报告错误(但是它也不会向stdin/out或日志文件进行批处理)。
因此,问题是这个使用者不向es提交任何内容,也不在容器中记录任何内容。我别无选择,所以我需要帮助。
我不明白为什么uwsgi不写/输出no活动。docker容器不包含任何日志(我检查过),我似乎无法理解为什么多处理进程不能启动。我做了lazy app config,所以所有的worker都应该独立生成。如果我运行单个或多个进程,行为是相同的,在主/非主模式下也是相同的,并且不会创建文件/var/log/uwsgi/app/myapp\u consumer.log。

ini配置:

;uWSGI instance configuration
[uwsgi]
ini = /etc/uwsgi/apps-enabled/myapp_consumer.ini
uid = www-data
gid = www-data
plugins = python3
socket = /tmp/uwsgi.sock
chdir = /opt/myapp_consumer/src
enable-threads = true
lazy-apps = true
processes = 4
threads = 2
close-on-exec = true
show-config = true
master = true
logfile = file:/var/log/uwsgi/app/myapp_consumer.log
logfile-chmod = 644
logfile-chown = www-data:www-data
wsgi-file = main.py
env = MYAPP_CONSUMER_HOME=/opt/myapp_consumer
;end of configuration

uwsgi日志


***Starting uWSGI 2.0.12-debian (64bit) on [Sun Feb 25 23:53:51 2018]***

compiled with version: 5.4.0 20160609 on 31 August 2017 21:02:04
os: Linux-4.9.60-linuxkit-aufs #1 SMP Mon Nov 6 16:00:12 UTC 2017
nodename: ecf416b71ce0
machine: x86_64
clock source: unix
pcre jit disabled
detected number of CPU cores: 4
current working directory: /
detected binary path: /usr/bin/uwsgi-core
setgid() to 33
setuid() to 33
chdir() to /opt/myapp_consumer/src
your memory page size is 4096 bytes
detected max file descriptor number: 1048576
lock engine: pthread robust mutexes
thunder lock: disabled (you can enable it with --thunder-lock)
uwsgi socket 0 bound to UNIX address /tmp/uwsgi.sock fd 3
Python version: 3.5.2 (default, Nov 23 2017, 16:37:01)  [GCC 5.4.0 20160609]
Python main interpreter initialized at 0xa3be40
python threads support enabled
your server socket listen backlog is limited to 100 connections
your mercy for graceful operations on workers is 60 seconds
mapped 415360 bytes (405 KB) for 8 cores

***Operational MODE: preforking+threaded***
***uWSGI is running in multiple interpreter mode***

spawned uWSGI master process (pid: 1)
spawned uWSGI worker 1 (pid: 7, cores: 2)
spawned uWSGI worker 2 (pid: 8, cores: 2)
spawned uWSGI worker 3 (pid: 9, cores: 2)
spawned uWSGI worker 4 (pid: 10, cores: 2)
WSGI app 0 (mountpoint='') ready in 0 seconds on interpreter 0xa3be40 pid: 9 (default app)
WSGI app 0 (mountpoint='') ready in 0 seconds on interpreter 0xa3be40 pid: 7 (default app)
WSGI app 0 (mountpoint='') ready in 0 seconds on interpreter 0xa3be40 pid: 10 (default app)
WSGI app 0 (mountpoint='') ready in 0 seconds on interpreter 0xa3be40 pid: 8 (default app)

myapp代码

请注意,myapp代码的缩写是为了隐藏客户端数据。然而,我认为问题的关键在于应用方法。。。


# !/usr/bin/env python

import logging
import time
import multiprocessing
import requests
import json

from kafka import KafkaConsumer

class AConsumer(multiprocessing.Process):
    def __init__(self):
        multiprocessing.Process.__init__(self)
        self.stop_event = multiprocessing.Event()

    def stop(self):
        self.stop_event.set()

    def run(self):
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 consumer_timeout_ms=2000)
        consumer.subscribe(['mytopic'])

        while not self.stop_event.is_set():
            for message in consumer:
                entry = json.loads(message.value)
                Transaction.commit(entry)

                if self.stop_event.is_set():
                    break

        consumer.close()

class Transaction:
    @staticmethod
    def commit(id, payload):
        sPayload = payload
        sHeaders = {'Content-Type': 'application/json'}
        sEndpoint = 'http://localhost:9200/myapp/entries/
        response = requests.post(sEndpoint, data=sPayload, headers=sHeaders)
        logging.info(response.text)

def application(env, start_response):
    start_response("200 OK", [("Content-Type", "text/plain"),
                              ("Content-Encoding", "utf-8")])
    logging.basicConfig(
        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
        level=logging.DEBUG
    )

    tasks = [
        AConsumer()
    ]

    for t in tasks:
        t.start()

dockerfile文件

FROM ubuntu:16.04

ENV DEBIAN_FRONTEND=noninteractive \
  TERM=linux \
  MYAPP_CONSUMER_HOME=/opt/myapp_consumer

RUN apt-get update \
  && apt-get upgrade -y \
  && apt-get install -y \
  apt-utils \
  build-essential \
  git \
  python3 \
  python3-dev \
  python3-setuptools \
  python3-pip \
  uwsgi \
  uwsgi-plugin-python3 \
  && apt-get autoremove \
  && apt-get clean \
  && rm -rf /var/lib/apt/lists/*

RUN pip3 install --upgrade pip
RUN pip3 install requests
RUN pip3 install xlwt
RUN pip3 install xlrd
RUN pip3 install kafka-python
RUN pip3 install -U uwsgi

# Copy our source files

COPY src ${MYAPP_CONSUMER_HOME}/src
COPY config ${MYAPP_CONSUMER_HOME}/config

RUN ln -s ${MYAPP_CONSUMER_HOME}/config/myapp_consumer.ini /etc/uwsgi/apps-enabled/
RUN chown -R www-data:www-data ${MYAPP_CONSUMER_HOME}/src

CMD ["/usr/bin/uwsgi", "--ini", "/etc/uwsgi/apps-enabled/myapp_consumer.ini"]
nwlqm0z1

nwlqm0z11#

在我的案例中,我遇到了一个类似的问题,将引导服务器的类型更改为列出帮助。所以,你的Kafka消费者可能看起来像:

consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],
                             auto_offset_reset='earliest',
                             consumer_timeout_ms=2000)

相关问题