439 lines
13 KiB
Python
439 lines
13 KiB
Python
from concurrent.futures import ProcessPoolExecutor, wait
|
||
from concurrent.futures.thread import ThreadPoolExecutor
|
||
from time import time_ns
|
||
from typing import List, Self, Tuple
|
||
|
||
import matplotlib.pyplot as plt
|
||
import numpy as np
|
||
import numpy.typing as npt
|
||
|
||
from common import indexes_to_cities, plot_plan, read_data
|
||
|
||
|
||
class GeneticTSP:
|
||
"""A class for solving the travelling salesman problem using a genetic algorithm."""
|
||
|
||
def __init__(
|
||
self,
|
||
population: int,
|
||
crossover_prob: float,
|
||
mutation_prob: float,
|
||
data: npt.NDArray,
|
||
) -> None:
|
||
"""The init method of GeneticTSP.
|
||
|
||
Args:
|
||
population (int): The size of the population for each generation.
|
||
crossover_prob (float): The probability of crossover happening.
|
||
mutation_prob (float): The probability of a mutation happening.
|
||
data (npt.NDarray): An NxN array containing the distances between cities.
|
||
|
||
Returns:
|
||
None
|
||
"""
|
||
|
||
self.generation: int = 0
|
||
self.population: int = population
|
||
self.data: npt.NDArray = data
|
||
self.genes: int = len(data)
|
||
self.crossover_prob: float = crossover_prob
|
||
self.mutation_prob: float = mutation_prob
|
||
self.best_fitness = []
|
||
self.generate_first_generation()
|
||
|
||
def generate_first_generation(self) -> None:
|
||
"""Generate the first generation of n random permutations (individuals).
|
||
|
||
Returns:
|
||
None
|
||
"""
|
||
|
||
self.individuals: npt.NDArray = np.array(
|
||
[
|
||
np.random.permutation(np.arange(self.genes))
|
||
for _ in range(self.population)
|
||
]
|
||
)
|
||
|
||
def get_distance(self, individual: npt.NDArray) -> float:
|
||
"""Get the distance of the circuit that candidate creates.
|
||
|
||
Args:
|
||
individual (npt.NDArray): The circuit to use to calculate the distance.
|
||
|
||
Returns:
|
||
float: The distance of the circuit of the individual.
|
||
"""
|
||
|
||
return np.array(
|
||
[self.data[individual[i - 1], individual[i]] for i in range(self.genes)]
|
||
).sum()
|
||
|
||
def fitness(self) -> None:
|
||
"""Calculate the fitness of each individual.
|
||
|
||
Creates a normalized array where individuals with shorter circuits
|
||
have a higher fitness.
|
||
|
||
Returns:
|
||
None
|
||
"""
|
||
|
||
distances: npt.NDArray = np.array(
|
||
[self.get_distance(individual) for individual in self.individuals]
|
||
)
|
||
max_distance: float = max(distances)
|
||
|
||
# invert results so that the shortest distance gets the largest value.
|
||
fitness: npt.NDArray = max_distance - distances
|
||
|
||
# Normalize array.
|
||
fitness_sum: float = np.sum(fitness)
|
||
|
||
## If all individuals are the same, then they have equal probability
|
||
if fitness_sum <= 0:
|
||
self.fitness_probs = [1.0 / self.population for _ in range(self.population)]
|
||
else:
|
||
self.fitness_probs = fitness / fitness_sum
|
||
|
||
self.best_fitness.append(max(self.fitness_probs))
|
||
|
||
def crossover(
|
||
self, parent1: npt.NDArray, parent2: npt.NDArray
|
||
) -> Tuple[npt.NDArray, npt.NDArray]:
|
||
"""The crossover step when creating a new generation.
|
||
|
||
Args:
|
||
parent1 (npt.NDArray): The first parent to do crossover with.
|
||
parent2 (npt.NDArray): The second parent to do crossover with.
|
||
|
||
Return:
|
||
Tuple: The two new individuals for the next generation.
|
||
|
||
"""
|
||
|
||
if self.crossover_prob < np.random.random():
|
||
return (parent1, parent2)
|
||
|
||
cut: int = np.random.randint(0, self.genes)
|
||
|
||
offspring1: npt.NDArray = parent1[:cut]
|
||
offspring2: npt.NDArray = parent2[:cut]
|
||
|
||
# Add the elements not in parent2 as close to in order as possible.
|
||
offspring1 = np.concatenate(
|
||
(offspring1, np.array([gene for gene in parent2 if gene not in offspring1]))
|
||
)
|
||
|
||
# Add the elements not in parent2 as close to in order as possible.
|
||
offspring2 = np.concatenate(
|
||
(offspring2, np.array([gene for gene in parent1 if gene not in offspring2]))
|
||
)
|
||
|
||
return (offspring1, offspring2)
|
||
|
||
def mutate(self, individual: npt.NDArray) -> None:
|
||
"""The mutation step when creating a new generation.
|
||
|
||
Args:
|
||
individual (npt.NDArray): The individual to potentially mutate.
|
||
|
||
Returns:
|
||
None
|
||
"""
|
||
|
||
# Decide whether or not to mutate.
|
||
if self.mutation_prob < np.random.random():
|
||
return
|
||
|
||
pos1: int = np.random.randint(0, self.genes)
|
||
pos2: int = np.random.randint(0, self.genes)
|
||
|
||
individual[[pos1, pos2]] = individual[[pos2, pos1]]
|
||
|
||
def select_individual(self) -> npt.NDArray:
|
||
"""Select an individual using the fitness probabilities.
|
||
|
||
Returns:
|
||
npt.NDArray: The individual that has been selected.
|
||
"""
|
||
|
||
choice: int = np.random.choice(self.population, 1, p=self.fitness_probs)[0]
|
||
|
||
return self.individuals[choice]
|
||
|
||
def generate_next_generation(self) -> None:
|
||
"""Create the next generation of individuals.
|
||
|
||
Returns:
|
||
None
|
||
"""
|
||
|
||
new_generation: List = []
|
||
self.fitness()
|
||
|
||
offspring1: npt.NDArray
|
||
offspring2: npt.NDArray
|
||
|
||
# For each individual, create a new individual
|
||
for _ in range(0, self.population, 2):
|
||
# Select 2 individuals and perform crossover.
|
||
offspring1, offspring2 = self.crossover(
|
||
self.select_individual(), self.select_individual()
|
||
)
|
||
self.mutate(offspring1)
|
||
self.mutate(offspring2)
|
||
new_generation.append(offspring1)
|
||
new_generation.append(offspring2)
|
||
|
||
self.individuals = np.array(new_generation[: self.population])
|
||
|
||
def run(self, generations: int = 10) -> Self:
|
||
"""Run the genetic algorithm for a certain amount of generations.
|
||
|
||
Args:
|
||
generations (int): the number of generations to run the algorithm.
|
||
|
||
Returns:
|
||
Self: Itself, so that we can use ProcessPoolExecutor.
|
||
"""
|
||
|
||
for _ in range(generations):
|
||
self.generate_next_generation()
|
||
|
||
return self
|
||
|
||
def get_individuals(self) -> npt.NDArray:
|
||
"""Get all candidates.
|
||
|
||
Returns:
|
||
npt.NDArray: The array containing each individual.
|
||
"""
|
||
|
||
return self.individuals
|
||
|
||
def get_best_individual(self) -> Tuple[float, npt.NDArray]:
|
||
"""Get the best individual from all the individuals.
|
||
|
||
Returns:
|
||
Tuple[float, npt.NDArray]: A tuple with the distance and permutation
|
||
of the best individual.
|
||
|
||
"""
|
||
|
||
res = sorted(
|
||
[
|
||
(self.get_distance(individual), individual)
|
||
for individual in self.individuals
|
||
],
|
||
key=lambda i: i[0],
|
||
)
|
||
|
||
return res[0]
|
||
|
||
|
||
def test_best_params(data: npt.NDArray) -> Tuple[float, float, float]:
|
||
population: int = 50
|
||
crossover_prob: npt.NDArray = np.linspace(0.1, 1, 10)
|
||
mutation_prob: npt.NDArray = np.linspace(0.1, 1, 10)
|
||
|
||
best_distance: float = float("inf")
|
||
best_crossover_prob: float = 0.0
|
||
best_mutation_prob: float = 0.0
|
||
for c_prob in crossover_prob:
|
||
for m_prob in mutation_prob:
|
||
np.random.seed(1987)
|
||
gen = GeneticTSP(population, c_prob, m_prob, data)
|
||
gen.run(100)
|
||
tmp = gen.get_best_individual()
|
||
if tmp[0] < best_distance:
|
||
best_distance = tmp[0]
|
||
best_crossover_prob = c_prob
|
||
best_mutation_prob = m_prob
|
||
|
||
return best_distance, best_crossover_prob, best_mutation_prob
|
||
|
||
|
||
if __name__ == "__main__":
|
||
cities, data = read_data("./european_cities.csv")
|
||
|
||
np.random.seed(1987)
|
||
|
||
# print(f"Finding the best parameters for the mutation and crossover probabilities")
|
||
# bd, bc, bm = test_best_params(data[:10, :10])
|
||
|
||
# print(f"Best distance : {bd}")
|
||
# print(f"Best crossover probability: {bc}")
|
||
# print(f"Best mutation probability : {bm}\n")
|
||
|
||
bd, bc, bm = 0, 0.7, 0.1
|
||
populations = [10, 100, 1000]
|
||
generations = 300
|
||
avg_fitness = []
|
||
|
||
# 10 cities
|
||
print(f"Results for 10 cities.")
|
||
print(f"generations : {generations}")
|
||
print(f"Crossover probability: {bc}")
|
||
print(f"Mutation probability : {bm}\n")
|
||
|
||
for population in populations:
|
||
arr = [GeneticTSP(population, bc, bm, data[:10, :10]) for _ in range(20)]
|
||
|
||
t0 = time_ns()
|
||
futures = []
|
||
with ProcessPoolExecutor() as executor:
|
||
for obj in arr:
|
||
futures.append(executor.submit(obj.run, generations))
|
||
wait(futures)
|
||
t1 = time_ns()
|
||
|
||
t = (t1 - t0) / 1_000_000.0
|
||
|
||
arr = [future.result() for future in futures]
|
||
|
||
res = sorted(
|
||
list(map(lambda gen: gen.get_best_individual(), arr)),
|
||
key=lambda i: i[0],
|
||
)
|
||
|
||
distances = list(map(lambda n: n[0], res))
|
||
best = res[0][0]
|
||
worst = res[-1][0]
|
||
mean = sum(distances) / len(res)
|
||
std_dev = np.sqrt(sum([(i - mean) ** 2 for i in distances]) / len(res))
|
||
|
||
print(f"Results for a population of {population}.")
|
||
print(f"time : {t:>12.6f}ms")
|
||
print(f"best distance : {best:>12.6f}km")
|
||
print(f"worst distance : {worst:>12.6f}km")
|
||
print(f"average distance : {mean:>12.6f}km")
|
||
print(f"standard deviation: {std_dev:>12.6f}km\n")
|
||
|
||
fitness = np.array([gen.best_fitness for gen in arr])
|
||
avg_fitness.append((fitness.sum(axis=0) / len(arr), f"cities: 10, population: {population}"))
|
||
# ax.plot(
|
||
# np.arange(len(avg_fitness)),
|
||
# avg_fitness,
|
||
# label=f"cities: 10, population: {population}",
|
||
# )
|
||
|
||
plot_plan(indexes_to_cities(res[0][1], cities))
|
||
|
||
# 24 cities
|
||
print(f"Results for 24 cities.")
|
||
print(f"Crossover probability: {bc}")
|
||
print(f"Mutation probability : {bm}\n")
|
||
|
||
for population in populations:
|
||
arr = [GeneticTSP(population, bc, bm, data) for _ in range(20)]
|
||
|
||
t0 = time_ns()
|
||
futures = []
|
||
with ProcessPoolExecutor() as executor:
|
||
for obj in arr:
|
||
futures.append(executor.submit(obj.run, generations))
|
||
wait(futures)
|
||
t1 = time_ns()
|
||
|
||
t = (t1 - t0) / 1_000_000.0
|
||
|
||
arr = [future.result() for future in futures]
|
||
|
||
res = sorted(
|
||
list(map(lambda gen: gen.get_best_individual(), arr)),
|
||
key=lambda i: i[0],
|
||
)
|
||
|
||
distances = list(map(lambda n: n[0], res))
|
||
best = res[0][0]
|
||
worst = res[-1][0]
|
||
mean = sum(distances) / len(res)
|
||
std_dev = np.sqrt(sum([(i - mean) ** 2 for i in distances]) / len(res))
|
||
|
||
print(f"Results for a population of {population}.")
|
||
print(f"time : {t:>12.6f}ms")
|
||
print(f"best distance : {best:>12.6f}km")
|
||
print(f"worst distance : {worst:>12.6f}km")
|
||
print(f"average distance : {mean:>12.6f}km")
|
||
print(f"standard deviation: {std_dev:>12.6f}km\n")
|
||
|
||
fitness = np.array([gen.best_fitness for gen in arr])
|
||
avg_fitness.append((fitness.sum(axis=0) / len(arr), f"cities: 10, population: {population}"))
|
||
# ax.plot(
|
||
# np.arange(len(avg_fitness)),
|
||
# avg_fitness,
|
||
# label=f"cities: 24, population: {population}",
|
||
# )
|
||
|
||
plot_plan(indexes_to_cities(res[0][1], cities))
|
||
|
||
|
||
# Plot the average best fitnesses
|
||
fig, ax = plt.subplots()
|
||
|
||
x = np.arange(len(avg_fitness[0][0]))
|
||
for element in avg_fitness:
|
||
ax.plot(x, element[0], label=element[1])
|
||
|
||
ax.set_xlabel("generations")
|
||
ax.set_ylabel("avg best fitness")
|
||
fig.legend()
|
||
fig.savefig("./images/average_fitness.png")
|
||
|
||
"""Running example
|
||
|
||
oblig1 on main [!?⇡] via 🐍 v3.12.6 took 4m37s
|
||
❯ python genetic_algorithm.py
|
||
Results for 10 cities.
|
||
generations : 300
|
||
Crossover probability: 0.7
|
||
Mutation probability : 0.1
|
||
|
||
Results for a population of 10.
|
||
time : 717.914256ms
|
||
best distance : 7486.310000km
|
||
worst distance : 8737.340000km
|
||
average distance : 7634.319500km
|
||
standard deviation: 283.926327km
|
||
|
||
Results for a population of 100.
|
||
time : 6922.343884ms
|
||
best distance : 7486.310000km
|
||
worst distance : 7830.010000km
|
||
average distance : 7529.078000km
|
||
standard deviation: 88.041417km
|
||
|
||
Results for a population of 1000.
|
||
time : 99066.816177ms
|
||
best distance : 7486.310000km
|
||
worst distance : 7549.160000km
|
||
average distance : 7495.113500km
|
||
standard deviation: 18.968296km
|
||
|
||
Results for 24 cities.
|
||
Crossover probability: 0.7
|
||
Mutation probability : 0.1
|
||
|
||
Results for a population of 10.
|
||
time : 1441.588712ms
|
||
best distance : 15921.410000km
|
||
worst distance : 20250.840000km
|
||
average distance : 18045.991500km
|
||
standard deviation: 1060.673071km
|
||
|
||
Results for a population of 100.
|
||
time : 15143.475494ms
|
||
best distance : 13148.120000km
|
||
worst distance : 17453.060000km
|
||
average distance : 15048.892000km
|
||
standard deviation: 1083.912052km
|
||
|
||
Results for a population of 1000.
|
||
time : 151313.539435ms
|
||
best distance : 12890.050000km
|
||
worst distance : 15798.380000km
|
||
average distance : 14121.351500km
|
||
standard deviation: 924.716247km
|
||
"""
|