From d21e448e4a7633fcab450b93b73881051dfced2c Mon Sep 17 00:00:00 2001 From: Cory Balaton Date: Sun, 6 Oct 2024 01:41:53 +0200 Subject: [PATCH] First finished version --- README.md | 10 + genetic_algorithm.py | 422 ++++++++++++++++++++++++++++++++++++++----- hill_climbing.py | 8 +- 3 files changed, 389 insertions(+), 51 deletions(-) diff --git a/README.md b/README.md index 0df68a8..752733c 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,13 @@ # IN4050: Obligatory Assignment 1 [Project repository](https://gitea.balaton.dev/IN4050/in4050-oblig1) + +Email: [coryab@uio.no](mailto:coryab@uio.no) + +## How to run the programs + +To run any of the programs, just use this command: + +```bash +python .py +``` diff --git a/genetic_algorithm.py b/genetic_algorithm.py index 35c2f46..e6371d2 100644 --- a/genetic_algorithm.py +++ b/genetic_algorithm.py @@ -1,110 +1,438 @@ -import random -from typing import Tuple +from concurrent.futures import ProcessPoolExecutor, wait +from concurrent.futures.thread import ThreadPoolExecutor +from time import time_ns +from typing import List, Self, Tuple +import matplotlib.pyplot as plt import numpy as np import numpy.typing as npt -from common import plot_plan, read_data +from common import indexes_to_cities, plot_plan, read_data class GeneticTSP: + """A class for solving the travelling salesman problem using a genetic algorithm.""" + + def __init__( + self, + population: int, + crossover_prob: float, + mutation_prob: float, + data: npt.NDArray, + ) -> None: + """The init method of GeneticTSP. + + Args: + population (int): The size of the population for each generation. + crossover_prob (float): The probability of crossover happening. + mutation_prob (float): The probability of a mutation happening. + data (npt.NDarray): An NxN array containing the distances between cities. + + Returns: + None + """ - def __init__(self, population: int, crossover_prob: float, mutation_prob: float, data): self.generation: int = 0 self.population: int = population self.data: npt.NDArray = data self.genes: int = len(data) self.crossover_prob: float = crossover_prob self.mutation_prob: float = mutation_prob + self.best_fitness = [] self.generate_first_generation() - def generate_first_generation(self): - self.candidates: npt.NDArray = np.array([np.random.permutation(np.arange(self.genes)) for _ in range(self.population)]) + def generate_first_generation(self) -> None: + """Generate the first generation of n random permutations (individuals). + Returns: + None + """ - def get_distance(self, candidate): - return sum([self.data[candidate[i - 1], candidate[i]] for i in range(self.genes)]) + self.individuals: npt.NDArray = np.array( + [ + np.random.permutation(np.arange(self.genes)) + for _ in range(self.population) + ] + ) + def get_distance(self, individual: npt.NDArray) -> float: + """Get the distance of the circuit that candidate creates. - def fitness(self): - distances = np.array([ self.get_distance(candidate) for candidate in self.candidates ]) - max_distance = max(distances) - fitness = max_distance - distances - fitness_sum = sum(fitness) - self.fitness_probs: npt.NDArray = fitness / fitness_sum + Args: + individual (npt.NDArray): The circuit to use to calculate the distance. + Returns: + float: The distance of the circuit of the individual. + """ + + return np.array( + [self.data[individual[i - 1], individual[i]] for i in range(self.genes)] + ).sum() + + def fitness(self) -> None: + """Calculate the fitness of each individual. + + Creates a normalized array where individuals with shorter circuits + have a higher fitness. + + Returns: + None + """ + + distances: npt.NDArray = np.array( + [self.get_distance(individual) for individual in self.individuals] + ) + max_distance: float = max(distances) + + # invert results so that the shortest distance gets the largest value. + fitness: npt.NDArray = max_distance - distances + + # Normalize array. + fitness_sum: float = np.sum(fitness) + + ## If all individuals are the same, then they have equal probability + if fitness_sum <= 0: + self.fitness_probs = [1.0 / self.population for _ in range(self.population)] + else: + self.fitness_probs = fitness / fitness_sum + + self.best_fitness.append(max(self.fitness_probs)) + + def crossover( + self, parent1: npt.NDArray, parent2: npt.NDArray + ) -> Tuple[npt.NDArray, npt.NDArray]: + """The crossover step when creating a new generation. + + Args: + parent1 (npt.NDArray): The first parent to do crossover with. + parent2 (npt.NDArray): The second parent to do crossover with. + + Return: + Tuple: The two new individuals for the next generation. + + """ - def crossover(self, parent1: npt.NDArray, parent2: npt.NDArray) -> Tuple[npt.NDArray, npt.NDArray]: if self.crossover_prob < np.random.random(): return (parent1, parent2) cut: int = np.random.randint(0, self.genes) - offspring1 = parent1[:cut] - offspring2 = parent2[:cut] + offspring1: npt.NDArray = parent1[:cut] + offspring2: npt.NDArray = parent2[:cut] - offspring1 = np.concatenate((offspring1, np.array([gene for gene in parent2 if gene not in offspring1]))) - offspring2 = np.concatenate((offspring2, np.array([gene for gene in parent1 if gene not in offspring2]))) + # Add the elements not in parent2 as close to in order as possible. + offspring1 = np.concatenate( + (offspring1, np.array([gene for gene in parent2 if gene not in offspring1])) + ) + + # Add the elements not in parent2 as close to in order as possible. + offspring2 = np.concatenate( + (offspring2, np.array([gene for gene in parent1 if gene not in offspring2])) + ) return (offspring1, offspring2) + def mutate(self, individual: npt.NDArray) -> None: + """The mutation step when creating a new generation. - def mutate(self, offspring): + Args: + individual (npt.NDArray): The individual to potentially mutate. + + Returns: + None + """ + + # Decide whether or not to mutate. if self.mutation_prob < np.random.random(): return pos1: int = np.random.randint(0, self.genes) pos2: int = np.random.randint(0, self.genes) - offspring[pos1], offspring[pos2] = offspring[pos2], offspring[pos1] + individual[[pos1, pos2]] = individual[[pos2, pos1]] - def select_individual(self): - choice = np.random.choice(self.population, 1, p=self.fitness_probs)[0] - return self.candidates[choice] + def select_individual(self) -> npt.NDArray: + """Select an individual using the fitness probabilities. + Returns: + npt.NDArray: The individual that has been selected. + """ - def generate_next_generation(self): - new_generation = [] + choice: int = np.random.choice(self.population, 1, p=self.fitness_probs)[0] + + return self.individuals[choice] + + def generate_next_generation(self) -> None: + """Create the next generation of individuals. + + Returns: + None + """ + + new_generation: List = [] self.fitness() - - for _ in range(0,self.population,2): - offspring1, offspring2 = self.crossover(self.select_individual(), self.select_individual()) + + offspring1: npt.NDArray + offspring2: npt.NDArray + + # For each individual, create a new individual + for _ in range(0, self.population, 2): + # Select 2 individuals and perform crossover. + offspring1, offspring2 = self.crossover( + self.select_individual(), self.select_individual() + ) self.mutate(offspring1) self.mutate(offspring2) new_generation.append(offspring1) new_generation.append(offspring2) - self.candidates = np.array(new_generation) + self.individuals = np.array(new_generation[: self.population]) + def run(self, generations: int = 10) -> Self: + """Run the genetic algorithm for a certain amount of generations. + + Args: + generations (int): the number of generations to run the algorithm. + + Returns: + Self: Itself, so that we can use ProcessPoolExecutor. + """ - def run(self, generations = 10): for _ in range(generations): self.generate_next_generation() - def get_candidates(self): - return self.candidates + return self + + def get_individuals(self) -> npt.NDArray: + """Get all candidates. + + Returns: + npt.NDArray: The array containing each individual. + """ + + return self.individuals + + def get_best_individual(self) -> Tuple[float, npt.NDArray]: + """Get the best individual from all the individuals. + + Returns: + Tuple[float, npt.NDArray]: A tuple with the distance and permutation + of the best individual. + + """ + + res = sorted( + [ + (self.get_distance(individual), individual) + for individual in self.individuals + ], + key=lambda i: i[0], + ) + + return res[0] + + +def test_best_params(data: npt.NDArray) -> Tuple[float, float, float]: + population: int = 50 + crossover_prob: npt.NDArray = np.linspace(0.1, 1, 10) + mutation_prob: npt.NDArray = np.linspace(0.1, 1, 10) + + best_distance: float = float("inf") + best_crossover_prob: float = 0.0 + best_mutation_prob: float = 0.0 + for c_prob in crossover_prob: + for m_prob in mutation_prob: + np.random.seed(1987) + gen = GeneticTSP(population, c_prob, m_prob, data) + gen.run(100) + tmp = gen.get_best_individual() + if tmp[0] < best_distance: + best_distance = tmp[0] + best_crossover_prob = c_prob + best_mutation_prob = m_prob + + return best_distance, best_crossover_prob, best_mutation_prob if __name__ == "__main__": cities, data = read_data("./european_cities.csv") np.random.seed(1987) - gen = GeneticTSP(500, .8, .4, data[:10,:10]) - original_cands = gen.get_candidates() - res = [ (gen.get_distance(cand), cand) for cand in original_cands ] - res.sort(key=lambda i: i[0]) + # print(f"Finding the best parameters for the mutation and crossover probabilities") + # bd, bc, bm = test_best_params(data[:10, :10]) - print(f"Original population") - print(f"Distance: {res[0][0]}") - plot_plan([ cities[i] for i in res[-1][1] ]) + # print(f"Best distance : {bd}") + # print(f"Best crossover probability: {bc}") + # print(f"Best mutation probability : {bm}\n") - gen.run(500) + bd, bc, bm = 0, 0.7, 0.1 + populations = [10, 100, 1000] + generations = 300 + avg_fitness = [] - arr = gen.get_candidates() - res = [ (gen.get_distance(cand), cand) for cand in arr ] - res.sort(key=lambda i: i[0]) + # 10 cities + print(f"Results for 10 cities.") + print(f"generations : {generations}") + print(f"Crossover probability: {bc}") + print(f"Mutation probability : {bm}\n") - print(f"Improved population") - print(f"Distance: {res[0][0]}") - plot_plan([ cities[i] for i in res[0][1] ]) + for population in populations: + arr = [GeneticTSP(population, bc, bm, data[:10, :10]) for _ in range(20)] + + t0 = time_ns() + futures = [] + with ProcessPoolExecutor() as executor: + for obj in arr: + futures.append(executor.submit(obj.run, generations)) + wait(futures) + t1 = time_ns() + + t = (t1 - t0) / 1_000_000.0 + + arr = [future.result() for future in futures] + + res = sorted( + list(map(lambda gen: gen.get_best_individual(), arr)), + key=lambda i: i[0], + ) + + distances = list(map(lambda n: n[0], res)) + best = res[0][0] + worst = res[-1][0] + mean = sum(distances) / len(res) + std_dev = np.sqrt(sum([(i - mean) ** 2 for i in distances]) / len(res)) + + print(f"Results for a population of {population}.") + print(f"time : {t:>12.6f}ms") + print(f"best distance : {best:>12.6f}km") + print(f"worst distance : {worst:>12.6f}km") + print(f"average distance : {mean:>12.6f}km") + print(f"standard deviation: {std_dev:>12.6f}km\n") + + fitness = np.array([gen.best_fitness for gen in arr]) + avg_fitness.append((fitness.sum(axis=0) / len(arr), f"cities: 10, population: {population}")) + # ax.plot( + # np.arange(len(avg_fitness)), + # avg_fitness, + # label=f"cities: 10, population: {population}", + # ) + + plot_plan(indexes_to_cities(res[0][1], cities)) + + # 24 cities + print(f"Results for 24 cities.") + print(f"Crossover probability: {bc}") + print(f"Mutation probability : {bm}\n") + + for population in populations: + arr = [GeneticTSP(population, bc, bm, data) for _ in range(20)] + + t0 = time_ns() + futures = [] + with ProcessPoolExecutor() as executor: + for obj in arr: + futures.append(executor.submit(obj.run, generations)) + wait(futures) + t1 = time_ns() + + t = (t1 - t0) / 1_000_000.0 + + arr = [future.result() for future in futures] + + res = sorted( + list(map(lambda gen: gen.get_best_individual(), arr)), + key=lambda i: i[0], + ) + + distances = list(map(lambda n: n[0], res)) + best = res[0][0] + worst = res[-1][0] + mean = sum(distances) / len(res) + std_dev = np.sqrt(sum([(i - mean) ** 2 for i in distances]) / len(res)) + + print(f"Results for a population of {population}.") + print(f"time : {t:>12.6f}ms") + print(f"best distance : {best:>12.6f}km") + print(f"worst distance : {worst:>12.6f}km") + print(f"average distance : {mean:>12.6f}km") + print(f"standard deviation: {std_dev:>12.6f}km\n") + + fitness = np.array([gen.best_fitness for gen in arr]) + avg_fitness.append((fitness.sum(axis=0) / len(arr), f"cities: 10, population: {population}")) + # ax.plot( + # np.arange(len(avg_fitness)), + # avg_fitness, + # label=f"cities: 24, population: {population}", + # ) + + plot_plan(indexes_to_cities(res[0][1], cities)) + + + # Plot the average best fitnesses + fig, ax = plt.subplots() + + x = np.arange(len(avg_fitness[0][0])) + for element in avg_fitness: + ax.plot(x, element[0], label=element[1]) + + ax.set_xlabel("generations") + ax.set_ylabel("avg best fitness") + fig.legend() + fig.savefig("./images/average_fitness.png") + +"""Running example + +oblig1 on  main [!?⇡] via 🐍 v3.12.6 took 4m37s +❯ python genetic_algorithm.py +Results for 10 cities. +generations : 300 +Crossover probability: 0.7 +Mutation probability : 0.1 + +Results for a population of 10. +time : 717.914256ms +best distance : 7486.310000km +worst distance : 8737.340000km +average distance : 7634.319500km +standard deviation: 283.926327km + +Results for a population of 100. +time : 6922.343884ms +best distance : 7486.310000km +worst distance : 7830.010000km +average distance : 7529.078000km +standard deviation: 88.041417km + +Results for a population of 1000. +time : 99066.816177ms +best distance : 7486.310000km +worst distance : 7549.160000km +average distance : 7495.113500km +standard deviation: 18.968296km + +Results for 24 cities. +Crossover probability: 0.7 +Mutation probability : 0.1 + +Results for a population of 10. +time : 1441.588712ms +best distance : 15921.410000km +worst distance : 20250.840000km +average distance : 18045.991500km +standard deviation: 1060.673071km + +Results for a population of 100. +time : 15143.475494ms +best distance : 13148.120000km +worst distance : 17453.060000km +average distance : 15048.892000km +standard deviation: 1083.912052km + +Results for a population of 1000. +time : 151313.539435ms +best distance : 12890.050000km +worst distance : 15798.380000km +average distance : 14121.351500km +standard deviation: 924.716247km +""" diff --git a/hill_climbing.py b/hill_climbing.py index 6980af4..1805f38 100644 --- a/hill_climbing.py +++ b/hill_climbing.py @@ -66,14 +66,14 @@ def test_hill_climbing(data: npt.NDArray, cities: npt.NDArray, runs: int): distances = list(map(lambda n: n[0], res)) best = res[0][0] worst = res[-1][0] - avg = sum(distances) / runs - standard_deviation = np.sqrt(sum([(i - avg)**2 for i in distances]) / runs) + mean = sum(distances) / runs + std_dev = np.sqrt(sum([(i - mean)**2 for i in distances]) / runs) print(f"Hill climbing for {len(data)} cities.") print(f"best distance : {best:>12.6f}km") print(f"worst distance : {worst:>12.6f}km") - print(f"average distance : {avg:>12.6f}km") - print(f"standard deviation: {standard_deviation:>12.6f}km\n") + print(f"average distance : {mean:>12.6f}km") + print(f"standard deviation: {std_dev:>12.6f}km\n") plot_plan(indexes_to_cities(res[0][1], cities)) # Plot the best one