from concurrent.futures import ProcessPoolExecutor, wait from concurrent.futures.thread import ThreadPoolExecutor from time import time_ns from typing import List, Self, Tuple import matplotlib.pyplot as plt import numpy as np import numpy.typing as npt from common import indexes_to_cities, plot_plan, read_data class GeneticTSP: """A class for solving the travelling salesman problem using a genetic algorithm.""" def __init__( self, population: int, crossover_prob: float, mutation_prob: float, data: npt.NDArray, ) -> None: """The init method of GeneticTSP. Args: population (int): The size of the population for each generation. crossover_prob (float): The probability of crossover happening. mutation_prob (float): The probability of a mutation happening. data (npt.NDarray): An NxN array containing the distances between cities. Returns: None """ self.generation: int = 0 self.population: int = population self.data: npt.NDArray = data self.genes: int = len(data) self.crossover_prob: float = crossover_prob self.mutation_prob: float = mutation_prob self.best_fitness = [] self.generate_first_generation() def generate_first_generation(self) -> None: """Generate the first generation of n random permutations (individuals). Returns: None """ self.individuals: npt.NDArray = np.array( [ np.random.permutation(np.arange(self.genes)) for _ in range(self.population) ] ) def get_distance(self, individual: npt.NDArray) -> float: """Get the distance of the circuit that candidate creates. Args: individual (npt.NDArray): The circuit to use to calculate the distance. Returns: float: The distance of the circuit of the individual. """ return np.array( [self.data[individual[i - 1], individual[i]] for i in range(self.genes)] ).sum() def fitness(self) -> None: """Calculate the fitness of each individual. Creates a normalized array where individuals with shorter circuits have a higher fitness. Returns: None """ distances: npt.NDArray = np.array( [self.get_distance(individual) for individual in self.individuals] ) max_distance: float = max(distances) # invert results so that the shortest distance gets the largest value. fitness: npt.NDArray = max_distance - distances # Normalize array. fitness_sum: float = np.sum(fitness) ## If all individuals are the same, then they have equal probability if fitness_sum <= 0: self.fitness_probs = [1.0 / self.population for _ in range(self.population)] else: self.fitness_probs = fitness / fitness_sum self.best_fitness.append(max(self.fitness_probs)) def crossover( self, parent1: npt.NDArray, parent2: npt.NDArray ) -> Tuple[npt.NDArray, npt.NDArray]: """The crossover step when creating a new generation. Args: parent1 (npt.NDArray): The first parent to do crossover with. parent2 (npt.NDArray): The second parent to do crossover with. Return: Tuple: The two new individuals for the next generation. """ if self.crossover_prob < np.random.random(): return (parent1, parent2) cut: int = np.random.randint(0, self.genes) offspring1: npt.NDArray = parent1[:cut] offspring2: npt.NDArray = parent2[:cut] # Add the elements not in parent2 as close to in order as possible. offspring1 = np.concatenate( (offspring1, np.array([gene for gene in parent2 if gene not in offspring1])) ) # Add the elements not in parent2 as close to in order as possible. offspring2 = np.concatenate( (offspring2, np.array([gene for gene in parent1 if gene not in offspring2])) ) return (offspring1, offspring2) def mutate(self, individual: npt.NDArray) -> None: """The mutation step when creating a new generation. Args: individual (npt.NDArray): The individual to potentially mutate. Returns: None """ # Decide whether or not to mutate. if self.mutation_prob < np.random.random(): return pos1: int = np.random.randint(0, self.genes) pos2: int = np.random.randint(0, self.genes) individual[[pos1, pos2]] = individual[[pos2, pos1]] def select_individual(self) -> npt.NDArray: """Select an individual using the fitness probabilities. Returns: npt.NDArray: The individual that has been selected. """ choice: int = np.random.choice(self.population, 1, p=self.fitness_probs)[0] return self.individuals[choice] def generate_next_generation(self) -> None: """Create the next generation of individuals. Returns: None """ new_generation: List = [] self.fitness() offspring1: npt.NDArray offspring2: npt.NDArray # For each individual, create a new individual for _ in range(0, self.population, 2): # Select 2 individuals and perform crossover. offspring1, offspring2 = self.crossover( self.select_individual(), self.select_individual() ) self.mutate(offspring1) self.mutate(offspring2) new_generation.append(offspring1) new_generation.append(offspring2) self.individuals = np.array(new_generation[: self.population]) def run(self, generations: int = 10) -> Self: """Run the genetic algorithm for a certain amount of generations. Args: generations (int): the number of generations to run the algorithm. Returns: Self: Itself, so that we can use ProcessPoolExecutor. """ for _ in range(generations): self.generate_next_generation() return self def get_individuals(self) -> npt.NDArray: """Get all candidates. Returns: npt.NDArray: The array containing each individual. """ return self.individuals def get_best_individual(self) -> Tuple[float, npt.NDArray]: """Get the best individual from all the individuals. Returns: Tuple[float, npt.NDArray]: A tuple with the distance and permutation of the best individual. """ res = sorted( [ (self.get_distance(individual), individual) for individual in self.individuals ], key=lambda i: i[0], ) return res[0] def test_best_params(data: npt.NDArray) -> Tuple[float, float, float]: population: int = 50 crossover_prob: npt.NDArray = np.linspace(0.1, 1, 10) mutation_prob: npt.NDArray = np.linspace(0.1, 1, 10) best_distance: float = float("inf") best_crossover_prob: float = 0.0 best_mutation_prob: float = 0.0 for c_prob in crossover_prob: for m_prob in mutation_prob: np.random.seed(1987) gen = GeneticTSP(population, c_prob, m_prob, data) gen.run(100) tmp = gen.get_best_individual() if tmp[0] < best_distance: best_distance = tmp[0] best_crossover_prob = c_prob best_mutation_prob = m_prob return best_distance, best_crossover_prob, best_mutation_prob if __name__ == "__main__": cities, data = read_data("./european_cities.csv") np.random.seed(1987) # print(f"Finding the best parameters for the mutation and crossover probabilities") # bd, bc, bm = test_best_params(data[:10, :10]) # print(f"Best distance : {bd}") # print(f"Best crossover probability: {bc}") # print(f"Best mutation probability : {bm}\n") bd, bc, bm = 0, 0.7, 0.1 populations = [10, 100, 1000] generations = 300 avg_fitness = [] # 10 cities print(f"Results for 10 cities.") print(f"generations : {generations}") print(f"Crossover probability: {bc}") print(f"Mutation probability : {bm}\n") for population in populations: arr = [GeneticTSP(population, bc, bm, data[:10, :10]) for _ in range(20)] t0 = time_ns() futures = [] with ProcessPoolExecutor() as executor: for obj in arr: futures.append(executor.submit(obj.run, generations)) wait(futures) t1 = time_ns() t = (t1 - t0) / 1_000_000.0 arr = [future.result() for future in futures] res = sorted( list(map(lambda gen: gen.get_best_individual(), arr)), key=lambda i: i[0], ) distances = list(map(lambda n: n[0], res)) best = res[0][0] worst = res[-1][0] mean = sum(distances) / len(res) std_dev = np.sqrt(sum([(i - mean) ** 2 for i in distances]) / len(res)) print(f"Results for a population of {population}.") print(f"time : {t:>12.6f}ms") print(f"best distance : {best:>12.6f}km") print(f"worst distance : {worst:>12.6f}km") print(f"average distance : {mean:>12.6f}km") print(f"standard deviation: {std_dev:>12.6f}km\n") fitness = np.array([gen.best_fitness for gen in arr]) avg_fitness.append((fitness.sum(axis=0) / len(arr), f"cities: 10, population: {population}")) # ax.plot( # np.arange(len(avg_fitness)), # avg_fitness, # label=f"cities: 10, population: {population}", # ) plot_plan(indexes_to_cities(res[0][1], cities)) # 24 cities print(f"Results for 24 cities.") print(f"Crossover probability: {bc}") print(f"Mutation probability : {bm}\n") for population in populations: arr = [GeneticTSP(population, bc, bm, data) for _ in range(20)] t0 = time_ns() futures = [] with ProcessPoolExecutor() as executor: for obj in arr: futures.append(executor.submit(obj.run, generations)) wait(futures) t1 = time_ns() t = (t1 - t0) / 1_000_000.0 arr = [future.result() for future in futures] res = sorted( list(map(lambda gen: gen.get_best_individual(), arr)), key=lambda i: i[0], ) distances = list(map(lambda n: n[0], res)) best = res[0][0] worst = res[-1][0] mean = sum(distances) / len(res) std_dev = np.sqrt(sum([(i - mean) ** 2 for i in distances]) / len(res)) print(f"Results for a population of {population}.") print(f"time : {t:>12.6f}ms") print(f"best distance : {best:>12.6f}km") print(f"worst distance : {worst:>12.6f}km") print(f"average distance : {mean:>12.6f}km") print(f"standard deviation: {std_dev:>12.6f}km\n") fitness = np.array([gen.best_fitness for gen in arr]) avg_fitness.append((fitness.sum(axis=0) / len(arr), f"cities: 10, population: {population}")) # ax.plot( # np.arange(len(avg_fitness)), # avg_fitness, # label=f"cities: 24, population: {population}", # ) plot_plan(indexes_to_cities(res[0][1], cities)) # Plot the average best fitnesses fig, ax = plt.subplots() x = np.arange(len(avg_fitness[0][0])) for element in avg_fitness: ax.plot(x, element[0], label=element[1]) ax.set_xlabel("generations") ax.set_ylabel("avg best fitness") fig.legend() fig.savefig("./images/average_fitness.png") """Running example oblig1 on  main [!?⇡] via 🐍 v3.12.6 took 4m37s ❯ python genetic_algorithm.py Results for 10 cities. generations : 300 Crossover probability: 0.7 Mutation probability : 0.1 Results for a population of 10. time : 717.914256ms best distance : 7486.310000km worst distance : 8737.340000km average distance : 7634.319500km standard deviation: 283.926327km Results for a population of 100. time : 6922.343884ms best distance : 7486.310000km worst distance : 7830.010000km average distance : 7529.078000km standard deviation: 88.041417km Results for a population of 1000. time : 99066.816177ms best distance : 7486.310000km worst distance : 7549.160000km average distance : 7495.113500km standard deviation: 18.968296km Results for 24 cities. Crossover probability: 0.7 Mutation probability : 0.1 Results for a population of 10. time : 1441.588712ms best distance : 15921.410000km worst distance : 20250.840000km average distance : 18045.991500km standard deviation: 1060.673071km Results for a population of 100. time : 15143.475494ms best distance : 13148.120000km worst distance : 17453.060000km average distance : 15048.892000km standard deviation: 1083.912052km Results for a population of 1000. time : 151313.539435ms best distance : 12890.050000km worst distance : 15798.380000km average distance : 14121.351500km standard deviation: 924.716247km """