虽然action_space和observation_space是在process_simulation函数中定义的,但我的项目工作遇到了这种情况,请在这方面帮助我。我想使用OpenAI Gym和Python为深度强化学习定制环境。我想实现一个近端策略优化(PPO)模型来训练代理。
'从健身房导入Env从健身房.空格导入离散,框导入numpy作为np导入随机
类过程_模拟(环境):def * 初始化 *(自身):
self.simulation_duration = 60
self.time_step = 1
self.next_time_stop = 0
self.setup_time = 0
self.buffer_capacity = 99999
self.initial_buffer = 0
self.initial_reward = 0
self.process_time_of_product_a = [5,4,3,3,1]
self.process_time_of_product_b = [1,2,3,4,5]
self.process_time_of_product_c = [3,5,1,4,2]
self.default_standard_deviation = .0
self.fifo_list = []
self.time_to_get_raw_material = 1
self.cost_for_machining = 0
self.cost_for_storage = 1
self.cost_for_raw_material = 50
self.finished_jobs_machine1 = 0
self.finished_jobs_machine2 = 0
self.finished_jobs_machine3 = 0
self.finished_jobs_machine4 = 0
self.finished_jobs_machine5 = 0
self.consecutive_production_of_product_a = 0
self.consecutive_production_of_product_b = 0
self.consecutive_production_of_product_c = 0
self.consecutive_switches_to_product_a = 0
self.consecutive_switches_to_product_b = 0
self.consecutive_switches_to_product_c = 0
self.consecutive_idle_states = 0
self.rl_agent_actions = [0,1,2,3]
self.rl_agent_reward = self.initial_reward
self.state = dict()
self.observation_size = 3
self.action_size = len(self.rl_agent_actions)
self.observation_space = Box(low=np.array([0]), high=np.array([100]))
self.action_space = Discrete(3)
def _get_next_job(self,finished_jobs):
return self.fifo_list[finished_jobs]
def _apply_variability(self,process_time):
return random.gauss(process_time,process_time*self.default_standard_deviation)
def _calculate_sales_value(self,product_type):
if product_type=='A':
self.consecutive_production_of_product_a += 1
self.consecutive_production_of_product_b = 0
self.consecutive_production_of_product_c = 0
if self.consecutive_production_of_product_a ==1:
self.consecutive_switches_to_product_a +=1
return min(400, 200*(1.5**(self.consecutive_switches_to_product_a)))
else:
self.consecutive_switches_to_product_a = 0
return max (100, 200*(.5**(self.consecutive_switches_to_product_a-2)))
elif product_type=='B':
self.consecutive_production_of_product_b += 1
self.consecutive_production_of_product_a = 0
self.consecutive_production_of_product_c = 0
if self.consecutive_production_of_product_b ==1:
self.consecutive_switches_to_product_b +=1
return min(400, 200*(1.5**(self.consecutive_switches_to_product_b)))
else:
self.consecutive_switches_to_product_a = 0
return max (100, 200*(.5**(self.consecutive_switches_to_product_b-1)))
elif product_type=='C':
self.consecutive_production_of_product_c += 1
self.consecutive_production_of_product_a = 0
self.consecutive_production_of_product_b = 0
if self.consecutive_production_of_product_c ==1:
self.consecutive_switches_to_product_c +=1
return min(400, 200*(1.5**(self.consecutive_switches_to_product_c)))
else:
self.consecutive_switches_to_product_a = 0
return max (100, 200*(.5**(self.consecutive_switches_to_product_c)))
else:
raise ValueError('{} --> requested type is not in the list of allowed product types [A, B , C]'.format(product_type))
def _run_machine_1(self):
while True:
jobs = len(self.fifo_list)
if (jobs==0) or (jobs==self.finished_jobs_machine1):
yield self.timeout(self.time_step)
else:
next_job = self._get_next_job(self.finished_jobs_machine1)
if jobs==1:
yield self.timeout(self.setup_time)
elif self.fifo_list[self.finished_jobs_machine1-1] != next_job:
yield self.timeout(self.setup_time)
if next_job=='A':
yield self.timeout(self.time_to_get_raw_material)
yield self.raw_material_of_product_a.get(1)
self.reward -= self.cost_for_machining*self.process_time_of_product_a[0]
yield self.timeout(self._apply_variability(self.process_time_of_product_a[0]))
yield self.raw_material_machine1_product_a.put(1)
self.finished_jobs_machine1 +=1
elif next_job=='B':
yield self.timeout(self.time_to_get_raw_material)
yield self.raw_material_of_product_b.get(1)
self.reward -= self.cost_for_machining*self.process_time_of_product_b[0]
yield self.timeout(self._apply_variability(self.process_time_of_product_b[0]))
yield self.raw_material_machine1_product_b.put(1)
self.finished_jobs_machine1 +=1
elif next_job=='C':
yield self.timeout(self.time_to_get_raw_material)
yield self.raw_material_of_product_c.get(1)
self.reward -= self.cost_for_machining*self.process_time_of_product_c[0]
yield self.timeout(self._apply_variability(self.process_time_of_product_c[0]))
yield self.raw_material_machine1_product_c.put(1)
self.finished_jobs_machine1 +=1
else:
raise ValueError('{} --> requested type is not in the list of allowed product types [A, B, C]'.format(next_job))
env = process_simulation()
observation_dimensions = env.observation_space.shape[0]
num_actions = env.action_space.n`
1条答案
按热度按时间yrefmtwq1#
init函数有问题,我已经解决了。