728x90
반응형

728x90

안녕하세요! 오늘은 gymnasium에서 제공하는 cart pole 문제 설명에 대한 포스팅 진행하겠습니다.

 

저는 cart pole 문제를 DQNDDQN 두 가지 방법으로 풀어보았는데요!

각각 전체 코드는 포스팅 가장 아래에 있습니다.


  • cart pole 규칙

우선 gymnasium에서 제공하는 cart pole 문제는 아래 링크에서 자세하게 보실 수 있습니다.

https://gymnasium.farama.org/environments/classic_control/cart_pole/

 

Gymnasium Documentation

A standard API for reinforcement learning and a diverse set of reference environments (formerly Gym)

gymnasium.farama.org

 

위의 사진처럼 생긴 막대를 지속적으로 세우는 것이 cart pole의 가장 큰 목표 입니다.

cart pole의 자세한 규칙은 아래와 같습니다.

  1. 이동 방향좌, 우만 존재
  2. 현재 막대의 상태는 막대 위치, 속도, 각도, 각속도 총 4가지로 표현
  3. 하나의 step이 에피소드 종료되지 않으면 보상 1을 획득
  4. 각도가 좌우 12 º 이상 벗어나거나, 막대가 화면을 벗어나거나, 보상 500(혹은 200) 달성하면 에피소드 종료

저희는 위의 규칙을 잘 생각하여, 막대가 특정 각도를 넘어가거나 화면에 벗어나지 않게 500 에피소드 이상 버티는 것을 학습해주어야 합니다.

  • 필요한 라이브러리 설치

frozen lake는 gymnasium에서 제공하고 있기 때문에 가장 먼저 gymnasium을 설치해주시면 됩니다!

그리고 render 모드 활성화를 진행하기 위해 gymnasium[toy-text] 설치도 같이 진행해주세요.

두 패키지 모두 pip install로 설치해주시면 금방 설치할 수 있습니다.

import gymnasium as gym
from collections import defaultdict
import numpy as np
  • cart pole 환경 세팅

필요한 라이브러리를 설치한 후에는 cart pole 환경을 세팅해봅니다.

우선 환경 세팅을 하는 코드는 아래와 같습니다.

env = gym.make("CartPole-v1")

cart pole은 학습을 진행해주지 않으면 바로 에피소드가 종료가 되기 때문에 테스트를 진행해보기가 어렵습니다ㅠㅠ

어떤 식으로 진행되는지 궁금하신 분들은 전체 코드 복사 후 실행해보시면, cart pole이 어떻게 진행되는지 아실 수 있습니다!


여기까지가 cart pole 환경 세팅이었습니다.

cart pole 환경 세팅은 그렇게 어렵지 않아서, 금방 하실 수 있어요!

 

이번 포스팅에서는 DQN과 DDQN에 관한 전체 코드만 업로드하고, 자세한 포스팅은 추후에 진행하도록 하겠습니다.

두 가지 모두 500 에피소드에 달성한 모델이 다수 존재했습니다.

[DQN 전체 코드]

import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import os
import glob
import torch.nn.functional as F

# 하이퍼 파라미터
gamma = 0.99
learning_rate = 0.001
batch_size = 100
memory_size = 5000
# epsilon에 의해 행동을 선택할 때는 해당 부분 필요
# epsilon_start = 1.0
# epsilon_end = 0.001
# epsilon_decay = 0.995
episodes = 5000

class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_dim)

    def forward(self, x):
    	x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class ReplayMemory:
    def __init__(self, max_len):
        self.memory = deque(maxlen=max_len)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

def select_action(state, target_net, action_dim):
	# epsilon-greedy를 바탕으로 행동을 선택하는 과정
    # if random.random() < epsilon:
    #     return random.randint(0, action_dim - 1)
    # else:
    #     return target_net(state).argmax().item()
    
    # 행동 값을 확률로 변경하여, 확률에 따라 행동을 선택하는 과정
    q_value = target_net(state)
    p = F.softmax(q_value, dim=0).tolist()
    p = np.array(p)
    p /= p.sum()
    action = np.random.choice(action_dim, p=p)
    return action

def optimize_model(memory, policy_net, target_net, optimizer):
    if len(memory) < batch_size:
        return

    transitions = memory.sample(batch_size)
    batch = list(zip(*transitions))
    
    state_batch = torch.stack(batch[0])
    action_batch = torch.tensor(batch[1]).unsqueeze(1)
    reward_batch = torch.tensor(batch[2])
    next_state_batch = torch.stack(batch[3])
    done_batch = torch.tensor(batch[4], dtype=torch.float32)
    
    # DQN
    q_values = policy_net(state_batch).gather(1, action_batch)
    next_q_values = target_net(next_state_batch).max(1)[0].detach()
    target_q_values = reward_batch + (gamma * next_q_values * (1 - done_batch))
    loss = nn.MSELoss()(q_values.squeeze(), target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy_net = DQN(state_dim, action_dim)
target_net = DQN(state_dim, action_dim)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

# epsilon-greedy 방법으로 행동을 선택할 때 필요
# epsilon = epsilon_start

episode_rewards = []
episode_reward = 0

save_dir = "dqn_saved_models"
os.makedirs(save_dir, exist_ok=True)

# 500을 10회 진행하면 성공이라고 판단하여, 종료를 하기 위한 장치로 잠깐 쓰인 것
# count = 0

for episode in range(episodes):
    # if count > 10 :
    #     break
    state = torch.tensor(env.reset()[0], dtype=torch.float32)
    if episode % 1000 == 0: 
        print(f"Episode {episode}, Avg Reward: {episode_reward/1000}")
    if episode % 1000 == 0 :
        episode_reward = 0
    total_reward = 0

    # 500 초과인 경우는 done으로 판단
    while total_reward < 501 :
        action = select_action(state, target_net, action_dim)
        next_state, reward, done, _, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32)
        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        optimize_model(memory, policy_net, target_net, optimizer)
        
        if done :
            break
    # 500점 달성한 모델 저장
    if total_reward >= 500 :
    #    count += 1
         model_path = os.path.join(save_dir, f"dqn_model_episode_{episode}.pth")
         torch.save(policy_net.state_dict(), model_path)
    
    episode_reward += total_reward
    # epsilon-greedy로 action을 선택할 때는 있어야 함
    # if episode % 10 == 0 :
    #     epsilon = max(epsilon_end, epsilon*epsilon_decay)
    
    if episode % 20 == 0:
        target_net.load_state_dict(policy_net.state_dict())

    episode_rewards.append(total_reward)

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DQN on CartPole')
plt.show()

# 테스트 진행 - render를 켜줘야 확인이 가능
env = gym.make("CartPole-v1", render_mode='human')
# 500 달성한 모델 업로드
model_paths = glob.glob(os.path.join(save_dir, "*.pth"))

model_i = 0
for model_path in model_paths :
    policy_net.load_state_dict(torch.load(model_path))
    policy_net.eval()
    
    avg_reward = 0
    
    # 각 모델 별 10번 진행
    for episode in range(10) :    
        state = torch.tensor(env.reset()[0], dtype=torch.float32)
        total_reward = 0

        while total_reward < 501 :
            with torch.no_grad() :
                action = policy_net(state).argmax().item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32)

            state = next_state
            total_reward += reward
            
            if done :
                break

        avg_reward += total_reward
    print(f"model {model_i + 1}, Avg Reward: {avg_reward/10}")
    model_i += 1

[DDQN 전체 코드]

import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import os
import glob
import torch.nn.functional as F

# 하이퍼 파라미터
gamma = 0.99
learning_rate = 0.001
batch_size = 100
memory_size = 5000
# epsilon에 의해 행동을 선택할 때는 해당 부분 필요
# epsilon_start = 1.0
# epsilon_end = 0.001
# epsilon_decay = 0.995
episodes = 5000

class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_dim)

    def forward(self, x):
    	x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class ReplayMemory:
    def __init__(self, max_len):
        self.memory = deque(maxlen=max_len)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

def select_action(state, target_net, action_dim):
	# epsilon-greedy를 바탕으로 행동을 선택하는 과정
    # if random.random() < epsilon:
    #     return random.randint(0, action_dim - 1)
    # else:
    #     return target_net(state).argmax().item()
    
    # 행동 값을 확률로 변경하여, 확률에 따라 행동을 선택하는 과정
    q_value = target_net(state)
    p = F.softmax(q_value, dim=0).tolist()
    p = np.array(p)
    p /= p.sum()
    action = np.random.choice(action_dim, p=p)
    return action

def optimize_model(memory, policy_net, target_net, optimizer):
    if len(memory) < batch_size:
        return

    transitions = memory.sample(batch_size)
    batch = list(zip(*transitions))
    
    state_batch = torch.stack(batch[0])
    action_batch = torch.tensor(batch[1]).unsqueeze(1)
    reward_batch = torch.tensor(batch[2])
    next_state_batch = torch.stack(batch[3])
    done_batch = torch.tensor(batch[4], dtype=torch.float32)
    
    # DDQN
    q_values = policy_net(state_batch).gather(1, action_batch)
    next_action = policy_net(next_state_batch).argmax(1).unsqueeze(1)
    next_q_values = target_net(next_state_batch).gather(1, next_action).squeeze().detach()
    target_q_values = reward_batch + (gamma * next_q_values * (1 - done_batch))
    loss = nn.MSELoss()(q_values.squeeze(), target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy_net = DQN(state_dim, action_dim)
target_net = DQN(state_dim, action_dim)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

# epsilon-greedy 방법으로 행동을 선택할 때 필요
# epsilon = epsilon_start

episode_rewards = []
episode_reward = 0

save_dir = "dqn_saved_models"
os.makedirs(save_dir, exist_ok=True)

# 500을 10회 진행하면 성공이라고 판단하여, 종료를 하기 위한 장치로 잠깐 쓰인 것
# count = 0

for episode in range(episodes):
    # if count > 10 :
    #     break
    state = torch.tensor(env.reset()[0], dtype=torch.float32)
    if episode % 1000 == 0: 
        print(f"Episode {episode}, Avg Reward: {episode_reward/1000}")
    if episode % 1000 == 0 :
        episode_reward = 0
    total_reward = 0

    # 500 초과인 경우는 done으로 판단
    while total_reward < 501 :
        action = select_action(state, target_net, action_dim)
        next_state, reward, done, _, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32)
        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        optimize_model(memory, policy_net, target_net, optimizer)
        
        if done :
            break
    # 500점 달성한 모델 저장
    if total_reward >= 500 :
    #    count += 1
         model_path = os.path.join(save_dir, f"dqn_model_episode_{episode}.pth")
         torch.save(policy_net.state_dict(), model_path)
    
    episode_reward += total_reward
    # epsilon-greedy로 action을 선택할 때는 있어야 함
    # if episode % 10 == 0 :
    #     epsilon = max(epsilon_end, epsilon*epsilon_decay)
    
    if episode % 20 == 0:
        target_net.load_state_dict(policy_net.state_dict())

    episode_rewards.append(total_reward)

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DDQN on CartPole')
plt.show()

# 테스트 진행 - render를 켜줘야 확인이 가능
env = gym.make("CartPole-v1", render_mode='human')
# 500 달성한 모델 업로드
model_paths = glob.glob(os.path.join(save_dir, "*.pth"))

model_i = 0
for model_path in model_paths :
    policy_net.load_state_dict(torch.load(model_path))
    policy_net.eval()
    
    avg_reward = 0
    
    # 각 모델 별 10번 진행
    for episode in range(10) :    
        state = torch.tensor(env.reset()[0], dtype=torch.float32)
        total_reward = 0

        while total_reward < 501 :
            with torch.no_grad() :
                action = policy_net(state).argmax().item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32)

            state = next_state
            total_reward += reward
            
            if done :
                break

        avg_reward += total_reward
    print(f"model {model_i + 1}, Avg Reward: {avg_reward/10}")
    model_i += 1

결과물

 

코드에 궁금한 부분이 있으신 분들은 댓글로 남겨주시면, 답변 드리도록 하겠습니다.

★읽어주셔서 감사합니다★

728x90
반응형
728x90
반응형

728x90

 안녕하세요! 오늘은 기존 작성한 frozen lake 문제SARSA로 진행하는 방법에 대해 포스팅 하겠습니다.

  • SARSA 란?

강화 학습의 한 방법으로, Q-learning과 비슷하지만 행동 선택이 다른 방식입니다.

Q-learning이 미래의 최적 행동을 가정하고 학습한다면, SARSA는 실제로 선택한 행동을 기반으로 학습합니다.

즉 (현재 State, 현재 Action, 현재 Reward, 다음 State, 최적의 행동)이 Q-learning이었다면,

(현재 State, 현재 Action, 현재 Reward, 다음 State, 다음 Action)이 SARSA가 되고, 각각의 앞글자를 따와 SARSA라고 이름을 붙이게 된 것입니다.

 

frozen lake를 Q-learning으로 진행하는 방법은 아래 링크에서 확인해주세요.

https://yhj9855.com/entry/RL-gymnasium-frozen-lake-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-2

 

[RL] gymnasium frozen lake 강화 학습 - 2

안녕하세요! 오늘은 기존에 작성한 frozen lake 문제를 Q-learning으로 진행하는 방법에 대해 포스팅 하겠습니다. Q-learning이란?강화 학습의 한 방법으로, Q라는 테이블을 이용하는 것입니다.Q 테이블

yhj9855.com

 

frozen lake에 관한 전체적인 설명은 아래 포스팅에서 진행하고 있으니, 먼저 확인해주세요!

https://yhj9855.com/entry/RL-gymnasium-frozen-lake-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-1

 

[RL] gymnasium frozen lake 강화 학습 - 1

안녕하세요! 오늘은 gymnasium에서 제공하는 frozen lake 문제에 대한 설명에 대해 포스팅 하겠습니다. 강화 학습이란?행동을 통해 얻는 보상을 기반으로 학습하는 AI의 한 분야입니다.흔히 Reinforcemen

yhj9855.com


그럼 본격적으로 SARSA로 frozen lake 문제를 풀어보도록 하겠습니다.

SARSA로 frozen lake를 풀 때, 크게 두 가지를 생각하시면 됩니다.

  1. Q 값을 바탕으로 행동을 선택
  2. 선택한 행동을 SARSA 공식을 사용해서 Q 값 업데이트

위의 두 가지를 하나씩 자세히 살펴보겠습니다.

Q 값을 바탕으로 행동을 선택

해당 부분은 현재까지 업데이트 된 Q 값을 바탕으로 행동을 선택하는 것입니다.

기본적으로는 Q 값이 가장 높은 행동을 선택하면 됩니다.

 

하지만 여기에는 한 가지 문제점이 있습니다.

Q 값이 제대로 업데이트가 될 때까지 충분한 탐험을 진행하지 못했을 경우, 제대로 된 행동을 추출할 수 없다는 것입니다.

예를 들어, 초반에는 Q 값이 모두 0이기 때문에 (0, 0)에서 왼쪽으로 가는 행동만 선택하기 때문에 게임을 진행할 수가 없습니다.

 

저희는 이 문제를 해결하기 위해, ϵ-greedy 방법을 사용할 수 있습니다.

  • ϵ-greedy란?

탐험과 이용의 균형을 맞추기 위한 행동 선택 방법으로, 아래 공식을 따릅니다.

여기서 ϵ은 0과 1 사이의 값으로 ϵ 확률 만큼은 랜덤하게 행동을 하게 하여 탐험을 진행하도록 하고, (1-ϵ) 확률 만큼 Q 값이 가장 높은 행동을 선택하도록 합니다.

해당 ϵ을 초반에 높게 설정하고 점차 ϵ을 줄임으로써, 초반에는 랜덤 행동을 통한 탐험을 하게 하고 점차 Q 값을 이용하도록 행동을 선택할 수 있습니다.

 

ϵ-greedy를 활용하여 행동을 선택하는 코드는 아래와 같습니다.

# 초기 값은 보통 1로 설정
epsilon = 1.0
train = True

# ϵ-greedy를 활용한 행동 선택
def select_action(state) :
	# 훈련을 할 경우에는 ϵ-greedy 방법을 사용
   	# 테스트를 진행할 때는 온전히 Q 값만 사용
   	# np.random.rand()를 넣어, 후반에도 종종 탐험을 할 수 있도록 함
    if np.random.rand() < epsilon and train :
        action = np.random.choice([0, 1, 2, 3])
    else :
        action = np.argmax(Q[state])
    return action

선택한 행동을 SARSA 공식을 사용해서 Q 값 업데이트

해당 부분은 위에서 선택한 행동을 환경에서 실행해보고, 그 결과 값을 SARSA 공식에 맞게 Q 값을 업데이트 하는 것입니다.

코드로 들어가기 전에 먼저 Q 테이블을 업데이트하는 공식을 먼저 살펴보겠습니다.

해당 수식은 Q(s, a)를 업데이트 하는데, 특정 학습률 α에 있어 (1- α)만큼 현재의 Q 값 α만큼의 (보상값 r + 할인율  γ * 다음 state와 action의 Q값 Q(s', a'))를 반영한다는 의미입니다.

Q-learning을 배우신 분들은 아시겠지만, Q-learning에서의 maxQ값특정 행동 a'의 Q 값인 Q(s', a')로 바뀐 것을 알 수 있습니다.

  • 학습률 α

값이 높을수록 다음 행동 값 즉, 새로운 정보를 더 많이 반영한다는 것이고, 낮을수록 현재의 Q 값 즉, 기존의 경험을 더 많이 유지한다는 의미입니다.

  • 할인율 γ

미래 보상의 중요도를 나타내는 지표로, 보통은 미래의 보상에 너무 의존하지 않도록 1보다 약간 작은 수로 지정하는 것이 보통입니다.

  • Q(s', a')

실제로 선택한 다음 행동 a'에 대한 Q 값으로, 위의 행동 선택을 기반하여 다음 state에서 실제 action을 고른 값입니다.

이렇게 실제 행동을 기반으로 Q 값을 업데이트 하기 때문에, 안정적이지만 그만큼 느릴 수 있습니다.

하지만, frozen lake는 공간이 작은 문제라서 Q-learning과 크게 결과 차이는 없으실 거예요:)

 

해당 공식을 바탕으로 SARSA를 진행하는 코드는 아래와 같습니다.

# 학습을 진행할 때는 render 모드 비활성화
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery)
env.reset()
# 환경의 행동 범위 : 여기서는 상, 하, 좌, 우 총 4개
action_size = env.action_space.n

# defaultdict은 키가 없을 때 자동으로 기본값을 생성하기 때문에 강화 학습에서 많이 사용
Q = defaultdict(lambda: np.zeros(action_size))

alpha = 0.1
gamma = 0.99
# 총 학습을 진행할 에피소드 수
max_episode = 10000

def learn() :
    reward_list = []
    for i in range(1, max_episode+1) :
        # 100번째 마다 학습이 진행되고 있음을 출력
        if i % 100 == 0 :
            # 해당 에피소드까지 진행된 모든 보상의 평균을 구함
            avg_reward = sum(reward_list)/100
            print("\rEpisode {}/{} || average reward {}".format(i, max_episode, avg_reward), end="")
            reward_list = []
        # 에피소드를 처음 시작할 때 reset
        state, _ = env.reset()
        done = False
        all_reward = 0
        # 에피소드가 종료될 때까지 반복
        while not done :
        	# Q 테이블을 바탕으로 action을 고르는 함수
            action = select_action(state)
            # state, reward, done 외 사용하지 않기 때문에 _ 처리
            new_state, reward, done, _, _ = env.step(action)
            next_action = select_action(new_state)
            # SARSA
            Q[state][action] = (1-alpha)*Q[state][action] + alpha*(reward + gamma*Q[new_state][next_action])
            all_reward += reward
            state = new_state
        # 50번째 에피소드 마다 ϵ 값을 줄여줌
        if i % 50 == 0 :
            epsilon *= 0.99
        reward_list.append(all_reward)

 

위의 두 가지 과정을 합치면 SARSA로 frozen lake를 풀 수 있는 코드가 완성됩니다!

학습 후 테스트를 진행하고 싶으신 경우에는 render를 킨 환경을 다시 세팅해서 해주시면 됩니다.

전체 코드

행동 선택, 학습, 테스트 과정을 모두 포함한 전체 코드는 아래와 같습니다.

import gymnasium as gym
from collections import defaultdict
import numpy as np

# 미끄러짐 옵션 True/False 선택 가능
is_slippery = True
# 8x8 중에 선택 가능
map_size = '4x4'
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery)
env.reset()
action_size = env.action_space.n

Q = defaultdict(lambda: np.zeros(action_size))

alpha = 0.1
gamma = 0.99
epsilon = 1.0
train = True
max_episode = 100000


def select_action(state) :
    if np.random.rand() < epsilon and train :
        action = np.random.choice([0, 1, 2, 3])
    else :
        action = np.argmax(Q[state])
    return action

def learn() :
    global epsilon
    reward_list = []
    for i in range(1, max_episode+1) :
        # 100번째 마다 학습이 진행되고 있음을 출력
        if i % 100 == 0 :
            # 해당 에피소드까지 진행된 모든 보상의 평균을 구함
            avg_reward = sum(reward_list)/100
            print("\rEpisode {}/{} || average reward {}".format(i, max_episode, avg_reward), end="")
            reward_list = []
        state, _ = env.reset()
        done = False
        all_reward = 0
        while not done :
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            next_action = select_action(new_state)
            # SARSA
            Q[state][action] = (1-alpha)*Q[state][action] + alpha*(reward + gamma*Q[new_state][next_action])
            all_reward += reward
            state = new_state
        if i % 50 == 0 :
            epsilon *= 0.99
        reward_list.append(all_reward)

# 학습한 Q를 바탕으로 frozen lake 테스트
def testing_after_learning():
	# render를 켜야 제대로 학습이 되었는지 확인할 수 있음
    env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery, render_mode='human')
    total_test_episode = 10
    rewards = []
    for episode in range(total_test_episode):
        state, _ = env.reset()
        episode_reward = 0
        while True: 
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            episode_reward += reward
            if done:
                rewards.append(episode_reward)
                break
            state = new_state
    print("")
    print("avg: " + str(sum(rewards) / total_test_episode))

if __name__ == "__main__" :
    learn()
    testing_after_learning()

 

테스트를 진행하면서 is_slippery 옵션을 껐을 경우에는 1.0 보상을 받으면 성공이고, is_slippery 옵션을 켰을 경우에는 70% 이상 1.0 보상을 받으면 성공이라고 보실 수 있습니다.

 

추가로  is_slippery 옵션을 켰을 경우에는 학습을 많이 진행해야 어느 정도 수렴하시는 걸 보실 수 있습니다!

아무래도 model-free로 진행을 하니까 많이 느리더라구요ㅠㅠ

  • model-based

model-free가 아닌 어느 정도 model-based로 빠르게 학습을 하고 싶으신 경우 아래 상황을 고려할 수 있습니다.

  1. 행동 한 번을 진행할 때마다 reward에 - 진행
    → RL이 최단 경로로 진행하려는 경향을 학습할 수 있음
  2. 구멍에 빠졌을 경우, reward에 크게 - 진행
    → 구멍에 빠지지 않는 쪽으로 빠르게 학습할 수 있음
  3. 도착했을 경우, reward를 크게 추가
    → 도착 지점에 확실히 도착하기 위해 큰 reward를 지급

그 외에도 벽에 부딪히거나 하는 등 맵을 알고 있기 때문에 환경에 맞게 reward를 추가로 주거나 마이너스를 진행하여, model-based 모델을 만들 수도 있습니다.

 

그래도 강화 학습을 제대로 알기 위해서는 model-free로 진행해보는 것을 추천드립니다!

코드에 대해 궁금한 부분이 있으신 분들은 댓글로 남겨주시면, 답변 드리도록 하겠습니다.

★읽어주셔서 감사합니다★

728x90
반응형
728x90
반응형
728x90

 

안녕하세요! 오늘은 기존에 작성한 frozen lake 문제를 Q-learning으로 진행하는 방법에 대해 포스팅 하겠습니다.

 

  • Q-learning이란?

강화 학습의 한 방법으로, Q라는 테이블을 이용하는 것입니다.

Q 테이블 내 값은 특정 상황에서 어떤 행동을 했을 때의 보상 값의 큰 정도를 나타내는 것으로, 학습이 진행되면서 해당 값들을 업데이트 하여 최적의 행동을 찾는 것입니다.

 

frozen lake에 관한 전체적인 설명은 아래 포스팅에서 진행하고 있으니, 먼저 확인해주세요!

https://yhj9855.com/entry/RL-gymnasium-frozen-lake-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-1

 

[RL] gymnasium frozen lake 강화 학습 - 1

안녕하세요! 오늘은 gymnasium에서 제공하는 frozen lake 문제에 대한 설명에 대해 포스팅 하겠습니다. 강화 학습이란?행동을 통해 얻는 보상을 기반으로 학습하는 AI의 한 분야입니다.흔히 Reinforcemen

yhj9855.com


그럼 본격적으로 Q-learing으로 frozen lake 문제를 풀어보도록 하겠습니다.

Q-learning으로 frozen lake를 풀 때, 크게 두 가지를 생각하시면 됩니다.

  1. Q 값을 바탕으로 행동을 선택
  2. 선택한 행동을 Q-learning 공식을 사용해서 Q 값 업데이트

위의 두 가지를 하나씩 자세히 살펴보겠습니다.

Q 값을 바탕으로 행동을 선택

해당 부분은 현재까지 업데이트 된 Q 값을 바탕으로 행동을 선택하는 것입니다.

기본적으로는 Q 값이 가장 높은 행동을 선택하면 됩니다.

 

하지만 여기에는 한 가지 문제점이 있습니다.

Q 값이 제대로 업데이트가 될 때까지 충분한 탐험을 진행하지 못했을 경우, 제대로 된 행동을 추출할 수 없다는 것입니다.

예를 들어, 초반에는 Q 값이 모두 0이기 때문에 (0, 0)에서 왼쪽으로 가는 행동만 선택하기 때문에 게임을 진행할 수가 없습니다.

 

저희는 이 문제를 해결하기 위해, ϵ-greedy 방법을 사용할 수 있습니다.

  • ϵ-greedy란?

탐험과 이용의 균형을 맞추기 위한 행동 선택 방법으로, 아래 공식을 따릅니다.

여기서 ϵ은 0과 1 사이의 값으로 ϵ 확률 만큼은 랜덤하게 행동을 하게 하여 탐험을 진행하도록 하고, (1-ϵ) 확률 만큼 Q 값이 가장 높은 행동을 선택하도록 합니다.

해당 ϵ을 초반에 높게 설정하고 점차 ϵ을 줄임으로써, 초반에는 랜덤 행동을 통한 탐험을 하게 하고 점차 Q 값을 이용하도록 행동을 선택할 수 있습니다.

 

ϵ-greedy를 활용하여 행동을 선택하는 코드는 아래와 같습니다.

# 초기 값은 보통 1로 설정
epsilon = 1.0
train = True

# ϵ-greedy를 활용한 행동 선택
def select_action(state) :
	# 훈련을 할 경우에는 ϵ-greedy 방법을 사용
   	# 테스트를 진행할 때는 온전히 Q 값만 사용
   	# np.random.rand()를 넣어, 후반에도 종종 탐험을 할 수 있도록 함
    if np.random.rand() < epsilon and train :
        action = np.random.choice([0, 1, 2, 3])
    else :
        action = np.argmax(Q[state])
    return action

 

선택한 행동을 Q-learning 공식을 사용해서 Q 값 업데이트

해당 부분은 위에서 선택한 행동을 환경에서 실행해보고, 그 결과 값을 Q-learning 공식에 맞게 Q 값을 업데이트 하는 것입니다.

코드로 들어가기 전에 먼저 Q 테이블을 업데이트하는 공식을 먼저 살펴보겠습니다.

해당 수식은 Q(s, a)를 업데이트 하는데, 특정 학습률 α에 있어 (1- α)만큼 현재의 Q 값α만큼의 (보상값 r + 할인율  γ * 다음 state의 가장 높은 Q값 maxQ(s', a'))를 반영한다는 의미입니다.

  • 학습률 α

값이 높을수록 다음 행동 값 즉, 새로운 정보를 더 많이 반영한다는 것이고, 낮을수록 현재의 Q 값 즉, 기존의 경험을 더 많이 유지한다는 의미입니다.

  • 할인율 γ

미래 보상의 중요도를 나타내는 지표로, 보통은 미래의 보상에 너무 의존하지 않도록 1보다 약간 작은 수로 지정하는 것이 보통입니다.

  • maxQ(s', a')

다음 상태인 s'에서 가능한 모든 행동 중 가장 높은 Q 값을 의미하며, s'은 현재 state에서 위에서 고른 행동을 실행한 결과 값이라고 보시면 됩니다.

 

이제 해당 공식을 바탕으로 Q-learning을 하는 코드는 아래와 같습니다.

# 학습을 진행할 때는 render 모드 비활성화
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery)
env.reset()
# 환경의 행동 범위 : 여기서는 상, 하, 좌, 우 총 4개
action_size = env.action_space.n

# defaultdict은 키가 없을 때 자동으로 기본값을 생성하기 때문에 강화 학습에서 많이 사용
Q = defaultdict(lambda: np.zeros(action_size))

alpha = 0.1
gamma = 0.99
# 총 학습을 진행할 에피소드 수
max_episode = 10000

def learn() :
    reward_list = []
    for i in range(1, max_episode+1) :
        # 100번째 마다 학습이 진행되고 있음을 출력
        if i % 100 == 0 :
            # 해당 에피소드까지 진행된 모든 보상의 평균을 구함
            avg_reward = sum(reward_list)/100
            print("\rEpisode {}/{} || average reward {}".format(i, max_episode, avg_reward), end="")
            reward_list = []
        # 에피소드를 처음 시작할 때 reset
        state, _ = env.reset()
        done = False
        all_reward = 0
        # 에피소드가 종료될 때까지 반복
        while not done :
        	# Q 테이블을 바탕으로 action을 고르는 함수
            action = select_action(state)
            # state, reward, done 외 사용하지 않기 때문에 _ 처리
            new_state, reward, done, _, _ = env.step(action)
            # Q-learning 공식
            Q[state][action] = (1-alpha)*Q[state][action] + alpha*(reward + gamma*np.max(Q[new_state]))
            all_reward += reward
            state = new_state
        # 50번째 에피소드 마다 ϵ 값을 줄여줌
        if i % 50 == 0 :
            epsilon *= 0.99
        reward_list.append(all_reward)

위의 두 가지 과정을 합치면 Q-learing으로 frozen lake를 풀 수 있는 코드가 완성됩니다!

학습 후 테스트를 진행하고 싶으신 경우에는 render를 킨 환경을 다시 세팅해서 해주시면 됩니다.

 

전체 코드

행동 선택, 학습, 테스트 과정을 모두 포함한 전체 코드는 아래와 같습니다.

import gymnasium as gym
from collections import defaultdict
import numpy as np

# 미끄러짐 옵션 True/False 선택 가능
is_slippery = True
# 8x8 중에 선택 가능
map_size = '4x4'
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery)
env.reset()
action_size = env.action_space.n

Q = defaultdict(lambda: np.zeros(action_size))

alpha = 0.1
gamma = 0.99
epsilon = 1.0
train = True
max_episode = 100000


def select_action(state) :
    if np.random.rand() < epsilon and train :
        action = np.random.choice([0, 1, 2, 3])
    else :
        action = np.argmax(Q[state])
    return action

def learn() :
    global epsilon
    reward_list = []
    for i in range(1, max_episode+1) :
        # 100번째 마다 학습이 진행되고 있음을 출력
        if i % 100 == 0 :
            # 해당 에피소드까지 진행된 모든 보상의 평균을 구함
            avg_reward = sum(reward_list)/100
            print("\rEpisode {}/{} || average reward {}".format(i, max_episode, avg_reward), end="")
            reward_list = []
        state, _ = env.reset()
        done = False
        all_reward = 0
        while not done :
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            Q[state][action] = (1-alpha)*Q[state][action] + alpha*(reward + gamma*np.max(Q[new_state]))
            all_reward += reward
            state = new_state
        if i % 50 == 0 :
            epsilon *= 0.99
        reward_list.append(all_reward)

# 학습한 Q를 바탕으로 frozen lake 테스트
def testing_after_learning():
	# render를 켜야 제대로 학습이 되었는지 확인할 수 있음
    env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery, render_mode='human')
    total_test_episode = 10
    rewards = []
    for episode in range(total_test_episode):
        state, _ = env.reset()
        episode_reward = 0
        while True: 
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            episode_reward += reward
            if done:
                rewards.append(episode_reward)
                break
            state = new_state
    print("")
    print("avg: " + str(sum(rewards) / total_test_episode))

if __name__ == "__main__" :
    learn()
    testing_after_learning()

 

 

테스트를 진행하면서 is_slippery 옵션을 껐을 경우에는 1.0 보상을 받으면 성공이고, is_slippery 옵션을 켰을 경우에는 70% 이상 1.0 보상을 받으면 성공이라고 보실 수 있습니다.

 

추가로  is_slippery 옵션을 켰을 경우에는 학습을 많이 진행해야 어느 정도 수렴하시는 걸 보실 수 있습니다!

아무래도 model-free로 진행을 하니까 많이 느리더라구요ㅠㅠ

  • model-based

model-free가 아닌 어느 정도 model-based로 빠르게 학습을 하고 싶으신 경우 아래 상황을 고려할 수 있습니다.

  1. 행동 한 번을 진행할 때마다 reward에 - 진행
    → RL이 최단 경로로 진행하려는 경향을 학습할 수 있음
  2. 구멍에 빠졌을 경우, reward에 크게 - 진행
    → 구멍에 빠지지 않는 쪽으로 빠르게 학습할 수 있음
  3. 도착했을 경우, reward를 크게 추가
    → 도착 지점에 확실히 도착하기 위해 큰 reward를 지급

그 외에도 벽에 부딪히거나 하는 등 맵을 알고 있기 때문에 환경에 맞게 reward를 추가로 주거나 마이너스를 진행하여, model-based 모델을 만들 수도 있습니다.

 

그래도 강화 학습을 제대로 알기 위해서는 model-free로 진행해보는 것을 추천드립니다!

코드에 대해 궁금한 부분이 있으신 분들은 댓글로 남겨주시면, 답변 드리도록 하겠습니다.

★읽어주셔서 감사합니다★

728x90
반응형
728x90
반응형
728x90

 

안녕하세요! 오늘은 gymnasium에서 제공하는 frozen lake 문제에 대한 설명에 대해 포스팅 하겠습니다.

 

  • 강화 학습이란?

행동을 통해 얻는 보상을 기반으로 학습하는 AI의 한 분야입니다.

흔히 Reinforcement Learning 줄여서, RL이라고 부르는 강화 학습은 컴퓨터가 스스로 경험을 쌓으면서 어떤 행동이 가장 좋은 보상을 얻는지 배우는 과정이라고 볼 수 있습니다.

 

저는 frozen lake 문제를 Q-learningSARSA 두 가지 방법으로 풀어보았는데요!

각각 전체 코드는 포스팅 가장 아래에 있습니다.


  • frozen lake 규칙

우선 gymnasium에서 제공하는 frozen lake 문제는 아래 링크에서 자세하게 보실 수 있습니다.

https://gymnasium.farama.org/environments/toy_text/frozen_lake/

 

Gymnasium Documentation

A standard API for reinforcement learning and a diverse set of reference environments (formerly Gym)

gymnasium.farama.org

 

frozen lake의 기본 맵은 아래 사진처럼 생겼습니다.

  1. frozen lake의 기본 맵은 4x4 혹은 8x8로 존재
  2. 맵 내 구멍이 존재
    → 구멍의 수나 위치는 조절할 수 없음
  3. 선물에 있는 곳에 도착하거나, 구멍에 빠지면 에피소드 종료
    물에 있는 곳에 도착해야만 보상 1을 획득할 수 있음
  4. 움직임은 상, 하, 좌, 우 존재
  5. 얼음 내 미끄러짐을 키고 끌 수 있음
    → 미끄러짐을 킬 경우, 각 얼음마다 특정 확률로 다른 곳으로 움직이는 확률이 존재
    → 각 얼음마다 존재하는 미끄러질 확률을 조절할 수 없음

저희는 위의 규칙을 잘 생각하여, 구멍에 빠지지 않고 선물에 도착하는 경로를 학습해주어야 합니다.

  • 필요한 라이브러리 설치

frozen lake는 gymnasium에서 제공하고 있기 때문에 가장 먼저 gymnasium을 설치해주시면 됩니다!

그리고 render 모드 활성화를 진행하기 위해 gymnasium[toy-text] 설치도 같이 진행해주세요.

두 패키지 모두 pip install로 설치해주시면 금방 설치할 수 있습니다.

import gymnasium as gym
from collections import defaultdict
import numpy as np
  • frozen lake 환경 세팅

필요한 라이브러리를 설치한 후에는 frozen lake 환경을 세팅해봅니다.

우선 환경 세팅을 하는 코드는 아래와 같습니다.

# 미끄러지는 얼음을 만들지 결정하는 변수
is_slippery = False
# 맵 사이즈
map_size = '4x4'
# frozen lake 환경 설정
# render 활성화 환경
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery, render_mode='human')
# 환경을 초기화하는 함수
env.reset()

env.reset()의 경우 기본 기능은 환경을 초기화하는 함수지만, 환경을 세팅한 다음에 해당 코드를 부르지 않으면 오류가 발생하니 반드시 환경 사용 전에 env.reset()을 먼저 불러주세요!

  • frozen lake 테스트 진행하기

이제 만든 환경을 바탕으로 gymnasium에서 제공하는 frozen lake 테스트를 진행해봅시다!

테스트를 진행하는 코드는 아래와 같습니다.

while True:
    # 0 : 왼쪽, 1 : 아래, 2 : 오른쪽, 3 : 위
    action = input("이동할 방향: ")
    if action not in ['0','1','2','3']:
        continue
    action = int(action)
    # step() : 환경에 액션을 넣고 실행하는 함수
    state, reward, done, info, prob = env.step(action)
    print("위치: ", state, "행동: ", action, "보상: ", reward)
    print()
    if done:
        print("에피소드 종료", reward)
        break

해당 코드를 실행해보면, 4x4 맵이 있는 창에 입력한 행동대로 움직이는 것을 확인할 수 있습니다.

구멍에 빠지거나 선물에 도착하기 전에는 계속 움직일 수 있으니, 환경을 이해하기 위해 편하게 테스트 진행해보세요!

맵을 키우거나 미끄러지는 얼음을 키고 테스트 해보시는 것도 추천드립니다.

 


여기까지가 frozen lake 환경 세팅이었습니다.

이번 포스팅에서는 Q-learning과 SARSA에 관한 model-free 전체 코드만 업로드 되어 있습니다.

두 가지 모두 is_slippery가 False일 경우에는 1.0 만점을 받았고, is_silppery가 True일 경우에는 0.8 이상의 보상을 획득했습니다.

 

Q-learning에 관한 자세한 내용이 궁금하신 분들은 아래 링크에서 확인해주세요!

https://yhj9855.com/entry/RL-gymnasium-frozen-lake-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-2

 

[RL] gymnasium frozen lake 강화 학습 - 2

안녕하세요! 오늘은 기존에 작성한 frozen lake 문제를 Q-learning으로 진행하는 방법에 대해 포스팅 하겠습니다. Q-learning이란?강화 학습의 한 방법으로, Q라는 테이블을 이용하는 것입니다.Q 테이블

yhj9855.com

 

SARSA에 관한 자세한 내용이 궁금하진 분들은 아래 링크에서 확인해주세요!

https://yhj9855.com/entry/RL-gymnasium-frozen-lake-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-SARSA

 

[Q-learning 전체 코드]

import gymnasium as gym
from collections import defaultdict
import numpy as np

is_slippery = True
map_size = '8x8'
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery)
env.reset()
action_size = env.action_space.n

Q = defaultdict(lambda: np.zeros(action_size))

alpha = 0.1
gamma = 0.99
epsilon = 1.0
train = True
max_episode = 100000


def select_action(state) :
    if np.random.rand() < epsilon and train :
        action = np.random.choice([0, 1, 2, 3])
    else :
        action = np.argmax(Q[state])
    return action

def learn() :
    global epsilon
    reward_list = []
    for i in range(1, max_episode+1) :
        # 100번째 마다 학습이 진행되고 있음을 출력
        if i % 100 == 0 :
            # 해당 에피소드까지 진행된 모든 보상의 평균을 구함
            avg_reward = sum(reward_list)/100
            print("\rEpisode {}/{} || average reward {}".format(i, max_episode, avg_reward), end="")
            reward_list = []
        # 에피소드를 처음 시작할 때는 reset을 해줘야 함
        state, _ = env.reset()
        done = False
        all_reward = 0
        # 에피소드가 종료될 때까지 반복
        while not done :
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            # Q 러닝
            Q[state][action] = (1-alpha)*Q[state][action] + alpha*(reward + gamma*np.max(Q[new_state]))
            all_reward += reward
            state = new_state
        if i % 50 == 0 :
            epsilon *= 0.99
        reward_list.append(all_reward)

def testing_after_learning():
    env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery, render_mode='human')
    total_test_episode = 10
    rewards = []
    for episode in range(total_test_episode):
        state, _ = env.reset()
        episode_reward = 0
        while True: 
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            episode_reward += reward
            if done:
                rewards.append(episode_reward)
                break
            state = new_state
    print("")
    print("avg: " + str(sum(rewards) / total_test_episode))

if __name__ == "__main__" :
    learn()
    testing_after_learning()

결과물

is_slippery = False / True

[SARSA 전체 코드]

import gymnasium as gym
from collections import defaultdict
import numpy as np

is_slippery = True
map_size = '4x4'
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery)
env.reset()
action_size = env.action_space.n

Q = defaultdict(lambda: np.zeros(action_size))

alpha = 0.1
gamma = 0.99
epsilon = 1.0
train = True
max_episode = 100000


def select_action(state) :
    if np.random.rand() < epsilon and train :
        action = np.random.choice([0, 1, 2, 3])
    else :
        action = np.argmax(Q[state])
    return action

def learn() :
    global epsilon
    reward_list = []
    for i in range(1, max_episode+1) :
        # 100번째 마다 학습이 진행되고 있음을 출력
        if i % 100 == 0 :
            # 해당 에피소드까지 진행된 모든 보상의 평균을 구함
            avg_reward = sum(reward_list)/100
            print("\rEpisode {}/{} || average reward {}".format(i, max_episode, avg_reward), end="")
            reward_list = []
        # 에피소드를 처음 시작할 때는 reset을 해줘야 함
        state, _ = env.reset()
        done = False
        all_reward = 0
        # 에피소드가 종료될 때까지 반복
        while not done :
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            next_action = select_action(new_state)
            # SARSA
            Q[state][action] = (1-alpha)*Q[state][action] + alpha*(reward + gamma*Q[new_state][next_action])
            all_reward += reward
            state = new_state
        if i % 50 == 0 :
            epsilon *= 0.99
        reward_list.append(all_reward)

def testing_after_learning():
    env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery, render_mode='human')
    total_test_episode = 10
    rewards = []
    for episode in range(total_test_episode):
        state, _ = env.reset()
        episode_reward = 0
        while True: 
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            episode_reward += reward
            if done:
                rewards.append(episode_reward)
                break
            state = new_state
    print("")
    print("avg: " + str(sum(rewards) / total_test_episode))

if __name__ == "__main__" :
    learn()
    testing_after_learning()

결과물

is_slippery = False / True

 

코드에 궁금한 부분이 있으신 분들은 댓글로 남겨주시면, 답변 드리도록 하겠습니다.

★읽어주셔서 감사합니다★

 

728x90
반응형

+ Recent posts