erlang 了解进程停止接收消息原因

e0bqpujr  于 2022-12-08  发布在  Erlang
关注(0)|答案(2)|浏览(230)

I have a situation where i have 3 processes:

  • one process acting as a dispatcher for messages: server
  • one process acting as a supervisor (for a worker): monitor
  • one process acting as a worker that notifies supervisor when done: worker

The server sends a request to monitor and the monitor first checks if worker is busy.If busy, monitor enqueues the message (if capacity is not reached) or else forwards it to worker . When the worker finishes processing it notifies both the client , and also the monitor
My problem is that my worker process stops responding after processing the first message

-module(mq).
-compile(export_all).

-record(monstate,{
    queue,
    qc,
    wpid,
    free=true,
    wref,
    init=false,
    frun=false
}).
-record(sstate,{
    init=false,
    mpid=null,
    mref=null
}).

-define(QUEUE_SIZE,5).
-define(PROC_SLEEP,2000).

createProcess({M,F,A})->
    Pid=spawn(M,F,[A]),
    Ref=erlang:monitor(process,Pid),
    {Pid,Ref}.

start()->
    spawn(?MODULE,server,[#sstate{init=false}]).

server(State=#sstate{init=I})when I=:=false ->
    {MPid,MRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
    server(State#sstate{init=true,mpid=MPid,mref=MRef});

server(State=#sstate{mpid=MPid,mref=MRef})->
    receive
           {From,state}->From ! State,
                            server(State);
           {From,Message}-> MPid ! {request,{From,Message}},
                            server(State);
                
            {'DOWN',MRef,process,MPid,_}-> {NewMPid,NewMRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
                                            server(State#sstate{mpid=NewMPid,mref=NewMRef});
            _ ->exit(invalid_message)
                                    
    end.
  

tryEnqueue(Message,MState=#monstate{queue=Q,qc=C}) when C<?QUEUE_SIZE->
    NewQueue=queue:in(Message,Q),
    {queued,MState#monstate{qc=C+1,queue=NewQueue}};
tryEnqueue(_,MState)->{queue_full,MState}.

monitor(MState=#monstate{wpid=_,wref=_,init=I}) when I=:= false ->
    {WorkerPid,WorkerRef}=createProcess({?MODULE,worker,self()}),
    monitor(MState#monstate{wpid=WorkerPid,wref=WorkerRef,init=true,qc=0,queue=queue:new(),frun=true});

monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C,frun=R})->
    receive
        
        {request,{From ,Message}} ->  
                                       {Result,NewState}=tryEnqueue({From,Message},MState),
                                        case Result of 
                                            queue_full -> From ! {queue_full,Message};
                                            _ -> ok
                                        end,
                                        case R of
                                            true -> self() ! {worker,{finished,R}},
                                                    monitor(NewState#monstate{frun=false});
                                            false -> monitor(NewState#monstate{frun=false})
                                        end;
                                       

        {worker,{finished,_}}-> case queue:out(Q) of
                                    {{_,Element},Rest} -> W ! Element,
                                                    monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
                                    {empty,Rest} -> monitor(MState#monstate{free=true,queue=Rest})
                                end;

        {'DOWN',Ref,process,_,_}->
             {NewWorkerPid,NewWorkerRef}=createProcess({?MODULE,worker,self()}),
             monitor(MState#monstate{wpid=NewWorkerPid,wref=NewWorkerRef,free=true});

        _->exit(invalid_message)

    end.

worker(MPid)->
    receive 
        {From,MSG} ->
            timer:sleep(?PROC_SLEEP),
            From ! {processed,MSG},
            MPid ! {worker,{finished,MSG}},
            worker(MPid);
        _ ->exit(bad_msg)
    end.

Usage

2> A=mq:start().
<0.83.0>
3> A ! {self(),aa}.
{<0.76.0>,aa}
4> flush().
Shell got {processed,aa}
ok
5> A ! {self(),aa}.
{<0.76.0>,aa}
6> flush().
ok

I have added a tracer to see what is happening:

10> dbg:tracer().
{ok,<0.96.0>}
11> dbg:p(new,[sos,m]).
{ok,[{matched,nonode@nohost,0}]}

First run:

14> A ! {self(),aa}.
(<0.100.0>) << {<0.76.0>,aa}     // message received my server 
(<0.100.0>) <0.101.0> ! {request,{<0.76.0>,aa}}   //message forwarded by server to monitor
{<0.76.0>,aa}
(<0.101.0>) << {request,{<0.76.0>,aa}}         
15> (<0.101.0>) <0.101.0> ! {worker,{finished,true}} //monitor starting the cycle
15> (<0.101.0>) << {worker,{finished,true}}  
15> (<0.101.0>) <0.102.0> ! {<0.76.0>,aa}  // monitor sending message to worker
15> (<0.102.0>) << {<0.76.0>,aa}
15> (<0.105.0>) <0.62.0> ! {io_request,<0.105.0>,
                           #Ref<0.3226054513.2760638467.167990>,
                           {get_until,unicode,
                               ["15",62,32],
                               erl_scan,tokens,
                               [1,[text]]}}
15> (<0.102.0>) << timeout                      //worker getting timeout ??
15> (<0.102.0>) <0.76.0> ! {processed,aa}    //worker sends to self() thje message
15> (<0.102.0>) <0.101.0> ! {worker,{finished,aa}}   //worker notifies monitor to update state
15> (<0.101.0>) << {worker,{finished,aa}}

Second run:

15> A ! {self(),aa}.
(<0.100.0>) << {<0.76.0>,aa}
(<0.100.0>) <0.101.0> ! {request,{<0.76.0>,aa}}   //monitor receiveing message
{<0.76.0>,aa}
(<0.101.0>) << {request,{<0.76.0>,aa}}
16> (<0.106.0>) <0.62.0> ! {io_request,<0.106.0>,
                           #Ref<0.3226054513.2760638467.168007>,
                           {get_until,unicode,
                               ["16",62,32],
                               erl_scan,tokens,
                               [1,[text]]}}

As you can see from my trace , in the first call i do not understand what happens.Does my worker get timeouted and if so why ?

P.S The frun variable is used as a flag that is true only at the first monitor iteration so that when the first item arrives the process will call itself to process it (send it to the worker) since the worker is free of duty. After the first run the monitor will dequeue items from the queue whenever the worker signals he is free.
Update

So after the helpful comments i have changed my logic a bit in the monitor so that the worker gets a message on the first run , or , after he is done and notifies the monitor , there are still items in the queue of the monitor . I still can't make it work.Where is the deadlock ?

monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C,frun=FirstRun})->
        receive
            {request,{From ,Message}} -> case FirstRun of
                                            true ->  W ! {From,Message},
                                                     monitor(MState#monstate{frun=false,free=false});                                                     
                                            false -> 
                                                     St=case tryEnqueue({From,Message},MState) of 
                                                           {queue_full,S} -> From ! {queue_full,Message},
                                                                             S;
                                                           {queued,S} -> S
                                                        end,
                                                     monitor(St)
                                             end;
                                                                        
            {worker,{finished,_}}-> case queue:out(Q) of
                                        {{_,Element},Rest} -> W ! Element,
                                                        monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
                                        {empty,Rest} -> monitor(MState#monstate{free=true,queue=Rest})
                                    end;

        end.
9udxz4iz

9udxz4iz1#

monitor的行为需要依赖于frun。它只需要依赖于worker是否是free。我已经更新了monitor函数,在下面的代码中反映了这一点。

-module(mq).
-compile(export_all).

-record(monstate,{
    queue,
    qc,
    wpid,
    free=true,
    wref,
    init=false
}).
-record(sstate,{
    init=false,
    mpid=null,
    mref=null
}).

-define(QUEUE_SIZE,5).
-define(PROC_SLEEP,2000).

createProcess({M,F,A})->
    Pid=spawn(M,F,[A]),
    Ref=erlang:monitor(process,Pid),
    {Pid,Ref}.

start()->
    spawn(?MODULE,server,[#sstate{init=false}]).

server(State=#sstate{init=I})when I=:=false ->
    {MPid,MRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
    server(State#sstate{init=true,mpid=MPid,mref=MRef});

server(State=#sstate{mpid=MPid,mref=MRef})->
    receive
           {From,state}->From ! State,
                            server(State);
           {From,Message}-> MPid ! {request,{From,Message}},
                            server(State);

            {'DOWN',MRef,process,MPid,_}-> {NewMPid,NewMRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
                                            server(State#sstate{mpid=NewMPid,mref=NewMRef});
            _ ->exit(invalid_message)

    end.

tryEnqueue(Message,MState=#monstate{queue=Q,qc=C}) when C<?QUEUE_SIZE->
    NewQueue=queue:in(Message,Q),
    {queued,MState#monstate{qc=C+1,queue=NewQueue}};
tryEnqueue(_,MState)->{queue_full,MState}.

monitor(MState=#monstate{wpid=_,wref=_,init=I}) when I=:= false ->
    {WorkerPid,WorkerRef}=createProcess({?MODULE,worker,self()}),
    monitor(MState#monstate{wpid=WorkerPid,wref=WorkerRef,init=true,qc=0,queue=queue:new()});

monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C})->
  receive
    {request,{From ,Message}} ->
      %% check whether worker is free or not
      case F of
        true ->
          W ! {From,Message},
          monitor(MState#monstate{free=false});

        false ->
          St=case tryEnqueue({From,Message},MState) of
               {queue_full,S} ->
                 From ! {queue_full,Message},
                 S;
               {queued,S} -> S
             end,
          monitor(St)
      end;

    {worker,{finished,_}} ->
      case queue:out(Q) of
        {{_,Element},Rest} ->
          W ! Element,
          monitor(MState#monstate{free=false,queue=Rest,qc=C-1});

        {empty,Rest} ->
          monitor(MState#monstate{free=true,queue=Rest})
      end;

    {'DOWN',Ref,process,_,_} ->
      {NewWorkerPid,NewWorkerRef}=createProcess({?MODULE,worker,self()}),
      monitor(MState#monstate{wpid=NewWorkerPid,wref=NewWorkerRef,free=true});

    _->exit(invalid_message)

  end.

worker(MPid)->
  receive
    {From,MSG} ->
      timer:sleep(?PROC_SLEEP),
      From ! {processed,MSG},
      MPid ! {worker,{finished,MSG}},
      worker(MPid);
    _ ->exit(bad_msg)
  end.

用法

Eshell V10.5  (abort with ^G)
1> c(mq).
mq.erl:2: Warning: export_all flag enabled - all functions will be exported
{ok,mq}
2> A=mq:start().
<0.92.0>
3> A ! {self(),aa}.
{<0.85.0>,aa}
4> flush().
Shell got {processed,aa}
ok
5> A ! {self(),aa}.
{<0.85.0>,aa}
6> flush().
Shell got {processed,aa}
ok
7> A ! {self(), aa}, A ! {self(), bb}.
{<0.85.0>,bb}
8> flush().                           
Shell got {processed,aa}
Shell got {processed,bb}
ok
9>
0sgqnhkj

0sgqnhkj2#

In your code, it seems that frun is always false after the first run:

case R of
                true -> self() ! {worker,{finished,R}},
                        monitor(NewState#monstate{frun=false});
                false -> monitor(NewState#monstate{frun=false})
            end;

Once it reaches false , no {worker, {finished, R}} message will be delivered and so no element will be extracted from the queue.

Update: Deadlock sequence:

  1. Monitor receives first job and forwards it to the worker
  2. Monitor's frun is false now
  3. Worker performs the job
  4. Worker notifies monitor that the job has finished
  5. Since monitor's queue is empty, nothing happens.
  6. Monitor receives second job, since frun is false, the job is not forwarded to the worker

相关问题