I have a situation where i have 3 processes:
- one process acting as a dispatcher for messages:
server
- one process acting as a supervisor (for a worker):
monitor
- one process acting as a worker that notifies supervisor when done:
worker
The server
sends a request to monitor and the monitor
first checks if worker
is busy.If busy, monitor
enqueues the message (if capacity is not reached) or else forwards it to worker
. When the worker finishes processing it notifies both the client , and also the monitor
My problem is that my worker process stops responding after processing the first message
-module(mq).
-compile(export_all).
-record(monstate,{
queue,
qc,
wpid,
free=true,
wref,
init=false,
frun=false
}).
-record(sstate,{
init=false,
mpid=null,
mref=null
}).
-define(QUEUE_SIZE,5).
-define(PROC_SLEEP,2000).
createProcess({M,F,A})->
Pid=spawn(M,F,[A]),
Ref=erlang:monitor(process,Pid),
{Pid,Ref}.
start()->
spawn(?MODULE,server,[#sstate{init=false}]).
server(State=#sstate{init=I})when I=:=false ->
{MPid,MRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
server(State#sstate{init=true,mpid=MPid,mref=MRef});
server(State=#sstate{mpid=MPid,mref=MRef})->
receive
{From,state}->From ! State,
server(State);
{From,Message}-> MPid ! {request,{From,Message}},
server(State);
{'DOWN',MRef,process,MPid,_}-> {NewMPid,NewMRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
server(State#sstate{mpid=NewMPid,mref=NewMRef});
_ ->exit(invalid_message)
end.
tryEnqueue(Message,MState=#monstate{queue=Q,qc=C}) when C<?QUEUE_SIZE->
NewQueue=queue:in(Message,Q),
{queued,MState#monstate{qc=C+1,queue=NewQueue}};
tryEnqueue(_,MState)->{queue_full,MState}.
monitor(MState=#monstate{wpid=_,wref=_,init=I}) when I=:= false ->
{WorkerPid,WorkerRef}=createProcess({?MODULE,worker,self()}),
monitor(MState#monstate{wpid=WorkerPid,wref=WorkerRef,init=true,qc=0,queue=queue:new(),frun=true});
monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C,frun=R})->
receive
{request,{From ,Message}} ->
{Result,NewState}=tryEnqueue({From,Message},MState),
case Result of
queue_full -> From ! {queue_full,Message};
_ -> ok
end,
case R of
true -> self() ! {worker,{finished,R}},
monitor(NewState#monstate{frun=false});
false -> monitor(NewState#monstate{frun=false})
end;
{worker,{finished,_}}-> case queue:out(Q) of
{{_,Element},Rest} -> W ! Element,
monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
{empty,Rest} -> monitor(MState#monstate{free=true,queue=Rest})
end;
{'DOWN',Ref,process,_,_}->
{NewWorkerPid,NewWorkerRef}=createProcess({?MODULE,worker,self()}),
monitor(MState#monstate{wpid=NewWorkerPid,wref=NewWorkerRef,free=true});
_->exit(invalid_message)
end.
worker(MPid)->
receive
{From,MSG} ->
timer:sleep(?PROC_SLEEP),
From ! {processed,MSG},
MPid ! {worker,{finished,MSG}},
worker(MPid);
_ ->exit(bad_msg)
end.
Usage
2> A=mq:start().
<0.83.0>
3> A ! {self(),aa}.
{<0.76.0>,aa}
4> flush().
Shell got {processed,aa}
ok
5> A ! {self(),aa}.
{<0.76.0>,aa}
6> flush().
ok
I have added a tracer to see what is happening:
10> dbg:tracer().
{ok,<0.96.0>}
11> dbg:p(new,[sos,m]).
{ok,[{matched,nonode@nohost,0}]}
First run:
14> A ! {self(),aa}.
(<0.100.0>) << {<0.76.0>,aa} // message received my server
(<0.100.0>) <0.101.0> ! {request,{<0.76.0>,aa}} //message forwarded by server to monitor
{<0.76.0>,aa}
(<0.101.0>) << {request,{<0.76.0>,aa}}
15> (<0.101.0>) <0.101.0> ! {worker,{finished,true}} //monitor starting the cycle
15> (<0.101.0>) << {worker,{finished,true}}
15> (<0.101.0>) <0.102.0> ! {<0.76.0>,aa} // monitor sending message to worker
15> (<0.102.0>) << {<0.76.0>,aa}
15> (<0.105.0>) <0.62.0> ! {io_request,<0.105.0>,
#Ref<0.3226054513.2760638467.167990>,
{get_until,unicode,
["15",62,32],
erl_scan,tokens,
[1,[text]]}}
15> (<0.102.0>) << timeout //worker getting timeout ??
15> (<0.102.0>) <0.76.0> ! {processed,aa} //worker sends to self() thje message
15> (<0.102.0>) <0.101.0> ! {worker,{finished,aa}} //worker notifies monitor to update state
15> (<0.101.0>) << {worker,{finished,aa}}
Second run:
15> A ! {self(),aa}.
(<0.100.0>) << {<0.76.0>,aa}
(<0.100.0>) <0.101.0> ! {request,{<0.76.0>,aa}} //monitor receiveing message
{<0.76.0>,aa}
(<0.101.0>) << {request,{<0.76.0>,aa}}
16> (<0.106.0>) <0.62.0> ! {io_request,<0.106.0>,
#Ref<0.3226054513.2760638467.168007>,
{get_until,unicode,
["16",62,32],
erl_scan,tokens,
[1,[text]]}}
As you can see from my trace , in the first call i do not understand what happens.Does my worker
get timeouted and if so why ?
P.S The frun
variable is used as a flag that is true only at the first monitor
iteration so that when the first item arrives the process will call itself to process it (send it to the worker) since the worker is free of duty. After the first run the monitor
will dequeue items from the queue whenever the worker
signals he is free.
Update
So after the helpful comments i have changed my logic a bit in the monitor
so that the worker
gets a message on the first run , or , after he is done and notifies the monitor
, there are still items in the queue of the monitor
. I still can't make it work.Where is the deadlock ?
monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C,frun=FirstRun})->
receive
{request,{From ,Message}} -> case FirstRun of
true -> W ! {From,Message},
monitor(MState#monstate{frun=false,free=false});
false ->
St=case tryEnqueue({From,Message},MState) of
{queue_full,S} -> From ! {queue_full,Message},
S;
{queued,S} -> S
end,
monitor(St)
end;
{worker,{finished,_}}-> case queue:out(Q) of
{{_,Element},Rest} -> W ! Element,
monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
{empty,Rest} -> monitor(MState#monstate{free=true,queue=Rest})
end;
end.
2条答案
按热度按时间9udxz4iz1#
monitor
的行为需要依赖于frun
。它只需要依赖于worker
是否是free
。我已经更新了monitor
函数,在下面的代码中反映了这一点。用法
0sgqnhkj2#
In your code, it seems that
frun
is always false after the first run:Once it reaches
false
, no{worker, {finished, R}}
message will be delivered and so no element will be extracted from the queue.Update: Deadlock sequence:
frun
is false nowfrun
is false, the job is not forwarded to the worker