in4050-oblig1/genetic_algorithm.py

439 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from concurrent.futures import ProcessPoolExecutor, wait
from concurrent.futures.thread import ThreadPoolExecutor
from time import time_ns
from typing import List, Self, Tuple
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
from common import indexes_to_cities, plot_plan, read_data
class GeneticTSP:
"""A class for solving the travelling salesman problem using a genetic algorithm."""
def __init__(
self,
population: int,
crossover_prob: float,
mutation_prob: float,
data: npt.NDArray,
) -> None:
"""The init method of GeneticTSP.
Args:
population (int): The size of the population for each generation.
crossover_prob (float): The probability of crossover happening.
mutation_prob (float): The probability of a mutation happening.
data (npt.NDarray): An NxN array containing the distances between cities.
Returns:
None
"""
self.generation: int = 0
self.population: int = population
self.data: npt.NDArray = data
self.genes: int = len(data)
self.crossover_prob: float = crossover_prob
self.mutation_prob: float = mutation_prob
self.best_fitness = []
self.generate_first_generation()
def generate_first_generation(self) -> None:
"""Generate the first generation of n random permutations (individuals).
Returns:
None
"""
self.individuals: npt.NDArray = np.array(
[
np.random.permutation(np.arange(self.genes))
for _ in range(self.population)
]
)
def get_distance(self, individual: npt.NDArray) -> float:
"""Get the distance of the circuit that candidate creates.
Args:
individual (npt.NDArray): The circuit to use to calculate the distance.
Returns:
float: The distance of the circuit of the individual.
"""
return np.array(
[self.data[individual[i - 1], individual[i]] for i in range(self.genes)]
).sum()
def fitness(self) -> None:
"""Calculate the fitness of each individual.
Creates a normalized array where individuals with shorter circuits
have a higher fitness.
Returns:
None
"""
distances: npt.NDArray = np.array(
[self.get_distance(individual) for individual in self.individuals]
)
max_distance: float = max(distances)
# invert results so that the shortest distance gets the largest value.
fitness: npt.NDArray = max_distance - distances
# Normalize array.
fitness_sum: float = np.sum(fitness)
## If all individuals are the same, then they have equal probability
if fitness_sum <= 0:
self.fitness_probs = [1.0 / self.population for _ in range(self.population)]
else:
self.fitness_probs = fitness / fitness_sum
self.best_fitness.append(max(self.fitness_probs))
def crossover(
self, parent1: npt.NDArray, parent2: npt.NDArray
) -> Tuple[npt.NDArray, npt.NDArray]:
"""The crossover step when creating a new generation.
Args:
parent1 (npt.NDArray): The first parent to do crossover with.
parent2 (npt.NDArray): The second parent to do crossover with.
Return:
Tuple: The two new individuals for the next generation.
"""
if self.crossover_prob < np.random.random():
return (parent1, parent2)
cut: int = np.random.randint(0, self.genes)
offspring1: npt.NDArray = parent1[:cut]
offspring2: npt.NDArray = parent2[:cut]
# Add the elements not in parent2 as close to in order as possible.
offspring1 = np.concatenate(
(offspring1, np.array([gene for gene in parent2 if gene not in offspring1]))
)
# Add the elements not in parent2 as close to in order as possible.
offspring2 = np.concatenate(
(offspring2, np.array([gene for gene in parent1 if gene not in offspring2]))
)
return (offspring1, offspring2)
def mutate(self, individual: npt.NDArray) -> None:
"""The mutation step when creating a new generation.
Args:
individual (npt.NDArray): The individual to potentially mutate.
Returns:
None
"""
# Decide whether or not to mutate.
if self.mutation_prob < np.random.random():
return
pos1: int = np.random.randint(0, self.genes)
pos2: int = np.random.randint(0, self.genes)
individual[[pos1, pos2]] = individual[[pos2, pos1]]
def select_individual(self) -> npt.NDArray:
"""Select an individual using the fitness probabilities.
Returns:
npt.NDArray: The individual that has been selected.
"""
choice: int = np.random.choice(self.population, 1, p=self.fitness_probs)[0]
return self.individuals[choice]
def generate_next_generation(self) -> None:
"""Create the next generation of individuals.
Returns:
None
"""
new_generation: List = []
self.fitness()
offspring1: npt.NDArray
offspring2: npt.NDArray
# For each individual, create a new individual
for _ in range(0, self.population, 2):
# Select 2 individuals and perform crossover.
offspring1, offspring2 = self.crossover(
self.select_individual(), self.select_individual()
)
self.mutate(offspring1)
self.mutate(offspring2)
new_generation.append(offspring1)
new_generation.append(offspring2)
self.individuals = np.array(new_generation[: self.population])
def run(self, generations: int = 10) -> Self:
"""Run the genetic algorithm for a certain amount of generations.
Args:
generations (int): the number of generations to run the algorithm.
Returns:
Self: Itself, so that we can use ProcessPoolExecutor.
"""
for _ in range(generations):
self.generate_next_generation()
return self
def get_individuals(self) -> npt.NDArray:
"""Get all candidates.
Returns:
npt.NDArray: The array containing each individual.
"""
return self.individuals
def get_best_individual(self) -> Tuple[float, npt.NDArray]:
"""Get the best individual from all the individuals.
Returns:
Tuple[float, npt.NDArray]: A tuple with the distance and permutation
of the best individual.
"""
res = sorted(
[
(self.get_distance(individual), individual)
for individual in self.individuals
],
key=lambda i: i[0],
)
return res[0]
def test_best_params(data: npt.NDArray) -> Tuple[float, float, float]:
population: int = 50
crossover_prob: npt.NDArray = np.linspace(0.1, 1, 10)
mutation_prob: npt.NDArray = np.linspace(0.1, 1, 10)
best_distance: float = float("inf")
best_crossover_prob: float = 0.0
best_mutation_prob: float = 0.0
for c_prob in crossover_prob:
for m_prob in mutation_prob:
np.random.seed(1987)
gen = GeneticTSP(population, c_prob, m_prob, data)
gen.run(100)
tmp = gen.get_best_individual()
if tmp[0] < best_distance:
best_distance = tmp[0]
best_crossover_prob = c_prob
best_mutation_prob = m_prob
return best_distance, best_crossover_prob, best_mutation_prob
if __name__ == "__main__":
cities, data = read_data("./european_cities.csv")
np.random.seed(1987)
# print(f"Finding the best parameters for the mutation and crossover probabilities")
# bd, bc, bm = test_best_params(data[:10, :10])
# print(f"Best distance : {bd}")
# print(f"Best crossover probability: {bc}")
# print(f"Best mutation probability : {bm}\n")
bd, bc, bm = 0, 0.7, 0.1
populations = [10, 100, 1000]
generations = 300
avg_fitness = []
# 10 cities
print(f"Results for 10 cities.")
print(f"generations : {generations}")
print(f"Crossover probability: {bc}")
print(f"Mutation probability : {bm}\n")
for population in populations:
arr = [GeneticTSP(population, bc, bm, data[:10, :10]) for _ in range(20)]
t0 = time_ns()
futures = []
with ProcessPoolExecutor() as executor:
for obj in arr:
futures.append(executor.submit(obj.run, generations))
wait(futures)
t1 = time_ns()
t = (t1 - t0) / 1_000_000.0
arr = [future.result() for future in futures]
res = sorted(
list(map(lambda gen: gen.get_best_individual(), arr)),
key=lambda i: i[0],
)
distances = list(map(lambda n: n[0], res))
best = res[0][0]
worst = res[-1][0]
mean = sum(distances) / len(res)
std_dev = np.sqrt(sum([(i - mean) ** 2 for i in distances]) / len(res))
print(f"Results for a population of {population}.")
print(f"time : {t:>12.6f}ms")
print(f"best distance : {best:>12.6f}km")
print(f"worst distance : {worst:>12.6f}km")
print(f"average distance : {mean:>12.6f}km")
print(f"standard deviation: {std_dev:>12.6f}km\n")
fitness = np.array([gen.best_fitness for gen in arr])
avg_fitness.append((fitness.sum(axis=0) / len(arr), f"cities: 10, population: {population}"))
# ax.plot(
# np.arange(len(avg_fitness)),
# avg_fitness,
# label=f"cities: 10, population: {population}",
# )
plot_plan(indexes_to_cities(res[0][1], cities))
# 24 cities
print(f"Results for 24 cities.")
print(f"Crossover probability: {bc}")
print(f"Mutation probability : {bm}\n")
for population in populations:
arr = [GeneticTSP(population, bc, bm, data) for _ in range(20)]
t0 = time_ns()
futures = []
with ProcessPoolExecutor() as executor:
for obj in arr:
futures.append(executor.submit(obj.run, generations))
wait(futures)
t1 = time_ns()
t = (t1 - t0) / 1_000_000.0
arr = [future.result() for future in futures]
res = sorted(
list(map(lambda gen: gen.get_best_individual(), arr)),
key=lambda i: i[0],
)
distances = list(map(lambda n: n[0], res))
best = res[0][0]
worst = res[-1][0]
mean = sum(distances) / len(res)
std_dev = np.sqrt(sum([(i - mean) ** 2 for i in distances]) / len(res))
print(f"Results for a population of {population}.")
print(f"time : {t:>12.6f}ms")
print(f"best distance : {best:>12.6f}km")
print(f"worst distance : {worst:>12.6f}km")
print(f"average distance : {mean:>12.6f}km")
print(f"standard deviation: {std_dev:>12.6f}km\n")
fitness = np.array([gen.best_fitness for gen in arr])
avg_fitness.append((fitness.sum(axis=0) / len(arr), f"cities: 10, population: {population}"))
# ax.plot(
# np.arange(len(avg_fitness)),
# avg_fitness,
# label=f"cities: 24, population: {population}",
# )
plot_plan(indexes_to_cities(res[0][1], cities))
# Plot the average best fitnesses
fig, ax = plt.subplots()
x = np.arange(len(avg_fitness[0][0]))
for element in avg_fitness:
ax.plot(x, element[0], label=element[1])
ax.set_xlabel("generations")
ax.set_ylabel("avg best fitness")
fig.legend()
fig.savefig("./images/average_fitness.png")
"""Running example
oblig1 on  main [!?⇡] via 🐍 v3.12.6 took 4m37s
python genetic_algorithm.py
Results for 10 cities.
generations : 300
Crossover probability: 0.7
Mutation probability : 0.1
Results for a population of 10.
time : 717.914256ms
best distance : 7486.310000km
worst distance : 8737.340000km
average distance : 7634.319500km
standard deviation: 283.926327km
Results for a population of 100.
time : 6922.343884ms
best distance : 7486.310000km
worst distance : 7830.010000km
average distance : 7529.078000km
standard deviation: 88.041417km
Results for a population of 1000.
time : 99066.816177ms
best distance : 7486.310000km
worst distance : 7549.160000km
average distance : 7495.113500km
standard deviation: 18.968296km
Results for 24 cities.
Crossover probability: 0.7
Mutation probability : 0.1
Results for a population of 10.
time : 1441.588712ms
best distance : 15921.410000km
worst distance : 20250.840000km
average distance : 18045.991500km
standard deviation: 1060.673071km
Results for a population of 100.
time : 15143.475494ms
best distance : 13148.120000km
worst distance : 17453.060000km
average distance : 15048.892000km
standard deviation: 1083.912052km
Results for a population of 1000.
time : 151313.539435ms
best distance : 12890.050000km
worst distance : 15798.380000km
average distance : 14121.351500km
standard deviation: 924.716247km
"""