阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
算是自己写的第一个强化学习环境目前还有很多纰漏逐步改进ing。
希望能在两周内施工完成。
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque
import matplotlib.pyplot as plt
import time
from tqdm import tqdm
import pandas as pd
def moving_average(data, window_size):
"""
平滑函数
:param data:
:param window_size:
:return:
"""
if window_size <= 0:
raise ValueError("Window size should be greater than 0.")
if window_size > len(data):
raise ValueError("Window size should not be greater than the length of data.")
cumsum = [0]
for i, x in enumerate(data):
cumsum.append(cumsum[i] + x)
ma_values = []
for i in range(len(data) - window_size + 1):
average = (cumsum[i + window_size] - cumsum[i]) / window_size
ma_values.append(average)
return ma_values
def plot_data(data, title="Data Plot", x_label="X-axis", y_label="Y-axis"):
"""
画图
:param data:
:param title:
:param x_label:
:param y_label:
:return:
Plots a simple line graph based on the provided data.
Parameters:
- data (list): A list of integers or floats to be plotted.
- title (str): The title of the plot.
- x_label (str): The label for the x-axis.
- y_label (str): The label for the y-axis.
"""
plt.figure(figsize=(10, 5))
plt.plot(data)
plt.title(title)
plt.xlabel(x_label)
plt.ylabel(y_label)
plt.grid(True, which='both', linestyle='--', linewidth=0.5)
plt.tight_layout()
plt.show()
class TransportMatchingEnv:
def __init__(self, num_drivers=5, num_goods=5, max_price=10, max_time=5):
"""
:param num_drivers: 货车数量
:param num_goods: 货物数量
:param max_price: 最大价格
:param max_time: 最大时间
"""
self.num_drivers = num_drivers
self.num_goods = num_goods
self.max_price = max_price
self.max_time = max_time
self.action_dim = self.num_drivers * self.num_goods * self.max_price * self.max_time
self.current_negotiation = None
self.combined_state = self.reset()
self.distance_matrix = np.random.randint(0, 100, (self.num_goods, self.num_drivers))
self.goods_time_preferences = np.random.randint(0, self.max_time, self.num_goods)
self.goods_expected_prices = np.random.randint(0, self.max_price, self.num_goods)
self.driver_availabilities = np.random.choice([0, 1], self.num_drivers)
self.goods_special_requirements = np.random.choice([0, 1], self.num_goods)
self.driver_special_capabilities = np.random.choice([0, 1])
def decode_action(self, encoded_action):
"""
将action解码为人类可以读懂的形式
:param encoded_action:
:return:
"""
total_actions_for_price_time = self.max_price * self.max_time
total_actions_per_good = self.num_drivers * total_actions_for_price_time
total_actions = self.num_goods * total_actions_per_good
if encoded_action >= total_actions:
raise ValueError("Encoded action is out of bounds!")
good_index = encoded_action // total_actions_per_good
residual = encoded_action % total_actions_per_good
driver_index = residual // total_actions_for_price_time
residual = residual % total_actions_for_price_time
price = residual // self.max_time
time = residual % self.max_time
return driver_index, good_index, price, time
def compute_reward(self, driver_index, good_index, price, time):
"""
计算reward
:param driver_index:
:param good_index:
:param price:
:param time:
:return:
"""
distance = self.distance_matrix[good_index][driver_index]
distance_factor = -distance
delivery_time_preference = self.goods_time_preferences[good_index]
time_penalty = -abs(delivery_time_preference - time) * 2
expected_price = self.goods_expected_prices[good_index]
price_difference = price - expected_price
price_factor = -abs(price_difference)
driver_availability = self.driver_availabilities[driver_index]
availability_factor = driver_availability * 10
good_requirement = self.goods_special_requirements[
good_index]
driver_capability = self.driver_special_capabilities[
driver_index]
requirement_factor = 0
if good_requirement > 0 and driver_capability < good_requirement:
requirement_factor = -20
total_reward = distance_factor + time_penalty + price_factor + availability_factor + requirement_factor
return total_reward
def reset(self):
"""
重置环境
:return:
"""
random.seed(0)
self.current_negotiation = np.zeros((self.num_goods, self.num_drivers))
self.distance_matrix = np.random.randint(0, 100, (self.num_goods, self.num_drivers))
self.goods_time_preferences = np.random.randint(0, self.max_time, self.num_goods)
self.goods_expected_prices = np.random.randint(0, self.max_price, self.num_goods)
self.driver_availabilities = np.random.choice([0, 1], self.num_drivers)
self.goods_special_requirements = np.random.choice([0, 1], self.num_goods)
self.driver_special_capabilities = np.random.choice([0, 1], self.num_drivers)
combined_state = np.concatenate((
self.current_negotiation.flatten(),
self.distance_matrix.flatten(),
self.goods_time_preferences,
self.goods_expected_prices,
self.driver_availabilities,
self.goods_special_requirements,
self.driver_special_capabilities
))
return combined_state
def driver_satisfaction(self, fee_received, expected_fee, distance_travelled, max_distance, wait_time,
max_wait_time,
goods_condition):
"""
为车主设计的满意度计算
:param fee_received: 收到的费用
:param expected_fee: 预期费用
:param distance_travelled: 行驶距离
:param max_distance: 最大距离
:param wait_time: 等待时间
:param max_wait_time: 最大等待时间
:param goods_condition: 货物状况
:return:
"""
price_satisfaction = (fee_received / expected_fee) * 40
distance_satisfaction = ((
max_distance - distance_travelled) / max_distance) * 30
wait_satisfaction = ((
max_wait_time - wait_time) / max_wait_time) * 20
goods_satisfaction = 10 if goods_condition == 'good' else 0
total_satisfaction = price_satisfaction + distance_satisfaction + wait_satisfaction + goods_satisfaction
return total_satisfaction
def shipper_satisfaction(self, fee_paid, expected_fee, delivery_time, expected_delivery_time, goods_condition,
driver_service_quality):
"""
为货主设计的满意度计算
:param fee_paid: 已付费用
:param expected_fee: 预期费用
:param delivery_time: 运输时间
:param expected_delivery_time: 期望运输时间
:param goods_condition: 货物状况
:param driver_service_quality: 司机服务质量
:return:
"""
price_satisfaction = (expected_fee / fee_paid) * 30
time_satisfaction = ((
expected_delivery_time - delivery_time) / expected_delivery_time) * 30
goods_satisfaction = 20 if goods_condition == 'good' else 0
service_satisfaction = driver_service_quality * 20 / 100
total_satisfaction = price_satisfaction + time_satisfaction + goods_satisfaction + service_satisfaction
return total_satisfaction
def successOrFailure(self):
return 1
def step(self, encoded_action):
""" TODO
核心逻辑部分
首先明确何时协商成功何时协商失败
:param encoded_action: 待被decode的action
:return:
"""
driver_index, good_index, price, time = self.decode_action(encoded_action)
if price <= self.max_price and time <= self.max_time:
self.current_negotiation[good_index][driver_index] = 1
reward = self.compute_reward(driver_index, good_index, price, time)
combined_state = np.concatenate((
self.current_negotiation.flatten(),
self.distance_matrix.flatten(),
self.goods_time_preferences,
self.goods_expected_prices,
self.driver_availabilities,
self.goods_special_requirements,
self.driver_special_capabilities
))
done = np.sum(self.current_negotiation) == self.num_goods
return combined_state, reward, done, {}
def render(self):
print(self.current_negotiation)
class RandomAgent:
def __init__(self, action_dim):
self.action_dim = action_dim
def act(self):
return np.random.choice(self.action_dim)
class DQN(nn.Module):
def __init__(self, input_dim, output_dim):
super(DQN, self).__init__()
self.fc = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, output_dim)
)
def forward(self, x):
return self.fc(x)
class DQNAgent:
def __init__(self, input_dim, action_dim, gamma=0.99, epsilon=0.99, lr=0.001):
self.input_dim = input_dim
self.action_dim = action_dim
self.gamma = gamma
self.epsilon = epsilon
self.lr = lr
self.network = DQN(input_dim, action_dim).float().to(device)
self.target_network = DQN(input_dim, action_dim).float().to(device)
self.target_network.load_state_dict(self.network.state_dict())
self.optimizer = optim.Adam(self.network.parameters(), lr=self.lr)
self.memory = deque(maxlen=2000)
def act(self, state):
if np.random.random() > self.epsilon:
state = torch.tensor([state], dtype=torch.float32).to(device)
with torch.no_grad():
action = self.network(state).argmax().item()
return action
else:
return np.random.choice(self.action_dim)
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def train(self, batch_size=64):
if len(self.memory) < batch_size:
return
batch = random.sample(self.memory, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.tensor(states, dtype=torch.float32).to(device)
actions = torch.tensor(actions, dtype=torch.int64).to(device)
rewards = torch.tensor(rewards, dtype=torch.float32).to(device)
next_states = torch.tensor(next_states, dtype=torch.float32).to(device)
dones = torch.tensor(dones, dtype=torch.float32).to(device)
current_values = self.network(states).gather(1, actions.unsqueeze(-1)).squeeze(-1)
next_values = self.target_network(next_states).max(1)[0].detach()
target_values = rewards + self.gamma * next_values * (1 - dones)
loss = nn.MSELoss()(current_values, target_values)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def update_target_network(self):
self.target_network.load_state_dict(self.network.state_dict())
def decrease_epsilon(self, decrement_value=0.001, min_epsilon=0.1):
self.epsilon = max(self.epsilon - decrement_value, min_epsilon)
if __name__ == '__main__':
start = time.time()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
rewards = []
env = TransportMatchingEnv(num_drivers=10, num_goods=10)
agent = DQNAgent(env.combined_state.flatten().shape[0], env.action_dim)
episodes = 2000
for episode in tqdm(range(episodes)):
state = env.reset()
done = False
episode_reward = 0
total_reward = 0
while not done:
action = agent.act(state)
next_state, reward, done, _ = env.step(action)
agent.remember(state, action, reward, next_state, done)
agent.train()
episode_reward += reward
total_reward += reward
state = next_state
done = done.item()
agent.decrease_epsilon()
rewards.append(total_reward)
if episode % 50 == 0:
agent.update_target_network()
df = pd.DataFrame(data=rewards)
df.to_excel('sample.xlsx', index=True)
plot_data(moving_average(data=rewards, window_size=1), title='reward', x_label='epoch', y_label='reward')
end = time.time()
print(f'device: {device}')
print(f'time: {end - start}')
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |