Compare commits

...

2 Commits

Author SHA1 Message Date
d21e448e4a
First finished version 2024-10-06 01:41:53 +02:00
c3c7860b03
Made some modifications 2024-10-05 15:05:23 +02:00
4 changed files with 432 additions and 69 deletions

View File

@ -1,3 +1,13 @@
# IN4050: Obligatory Assignment 1 # IN4050: Obligatory Assignment 1
[Project repository](https://gitea.balaton.dev/IN4050/in4050-oblig1) [Project repository](https://gitea.balaton.dev/IN4050/in4050-oblig1)
Email: [coryab@uio.no](mailto:coryab@uio.no)
## How to run the programs
To run any of the programs, just use this command:
```bash
python <exhaustive_search \| hill_climbing \| genetic_algorithm>.py
```

View File

@ -1,11 +1,12 @@
import time import time
from itertools import permutations from itertools import permutations
from typing import Tuple from typing import Tuple
from math import factorial
import numpy as np import numpy as np
import numpy.typing as npt import numpy.typing as npt
from common import plot_plan, read_data from common import indexes_to_cities, plot_plan, read_data
def exhaustive_search(distances: npt.NDArray) -> Tuple[float, npt.NDArray]: def exhaustive_search(distances: npt.NDArray) -> Tuple[float, npt.NDArray]:
@ -34,12 +35,15 @@ def exhaustive_search(distances: npt.NDArray) -> Tuple[float, npt.NDArray]:
), ),
permutations(range(size)), permutations(range(size)),
), ),
key=lambda x: x[0] # Make sure that it finds the minimal distance
) )
if __name__ == "__main__": if __name__ == "__main__":
cities, data = read_data("./european_cities.csv") cities, data = read_data("./european_cities.csv")
times = {}
# A loop timing finding the optimal solution for different n # A loop timing finding the optimal solution for different n
for n in range(6,11): for n in range(6,11):
# Time exhaustive search # Time exhaustive search
@ -49,32 +53,32 @@ if __name__ == "__main__":
time_elapsed_ms = (t1 - t0) / 1_000_000.0 time_elapsed_ms = (t1 - t0) / 1_000_000.0
times[n] = time_elapsed_ms, distance
if n in (6,10):
city_seq = indexes_to_cities(perm, cities)
plot_plan(city_seq)
print(f"Sequence for {n} cities: {city_seq}")
print("")
for n, (time, distance) in times.items():
print(f"Exhaustive search for the {n} first cities:") print(f"Exhaustive search for the {n} first cities:")
print(f"distance : {distance:>12.6f}km") print(f"{'distance':<25}: {distance:>12.6f}km")
print(f"time to find solution: {time_elapsed_ms:>12.6f}ms\n") print(f"{'time to find solution':<25}: {time:>12.6f}ms")
print(f"{f'time / {n}!':<25}: {time / factorial(n):>12.6f}\n")
"""Running example """Running example
oblig1 on main [?] via 🐍 v3.12.6 took 7s oblig1 on main [!] via 🐍 v3.12.6 took 14s
python exhaustive_search.py python exhaustive_search.py
Exhaustive search for the 6 first cities: Exhaustive search for the 6 first cities:
distance : 5018.810000km distance : 5018.810000km
time to find solution: 1.105330ms time to find solution : 1.485208ms
time / 6! : 0.002063
Exhaustive search for the 7 first cities:
distance : 5487.890000km
time to find solution: 10.089604ms
Exhaustive search for the 8 first cities:
distance : 6667.490000km
time to find solution: 78.810508ms
Exhaustive search for the 9 first cities:
distance : 6678.550000km
time to find solution: 765.676230ms
Exhaustive search for the 10 first cities: Exhaustive search for the 10 first cities:
distance : 7486.310000km distance : 7486.310000km
time to find solution: 8281.795515ms time to find solution : 10980.900480ms
time / 10! : 0.003026
""" """

View File

@ -1,110 +1,438 @@
import random from concurrent.futures import ProcessPoolExecutor, wait
from typing import Tuple from concurrent.futures.thread import ThreadPoolExecutor
from time import time_ns
from typing import List, Self, Tuple
import matplotlib.pyplot as plt
import numpy as np import numpy as np
import numpy.typing as npt import numpy.typing as npt
from common import plot_plan, read_data from common import indexes_to_cities, plot_plan, read_data
class GeneticTSP: class GeneticTSP:
"""A class for solving the travelling salesman problem using a genetic algorithm."""
def __init__(
self,
population: int,
crossover_prob: float,
mutation_prob: float,
data: npt.NDArray,
) -> None:
"""The init method of GeneticTSP.
Args:
population (int): The size of the population for each generation.
crossover_prob (float): The probability of crossover happening.
mutation_prob (float): The probability of a mutation happening.
data (npt.NDarray): An NxN array containing the distances between cities.
Returns:
None
"""
def __init__(self, population: int, crossover_prob: float, mutation_prob: float, data):
self.generation: int = 0 self.generation: int = 0
self.population: int = population self.population: int = population
self.data: npt.NDArray = data self.data: npt.NDArray = data
self.genes: int = len(data) self.genes: int = len(data)
self.crossover_prob: float = crossover_prob self.crossover_prob: float = crossover_prob
self.mutation_prob: float = mutation_prob self.mutation_prob: float = mutation_prob
self.best_fitness = []
self.generate_first_generation() self.generate_first_generation()
def generate_first_generation(self): def generate_first_generation(self) -> None:
self.candidates: npt.NDArray = np.array([np.random.permutation(np.arange(self.genes)) for _ in range(self.population)]) """Generate the first generation of n random permutations (individuals).
Returns:
None
"""
def get_distance(self, candidate): self.individuals: npt.NDArray = np.array(
return sum([self.data[candidate[i - 1], candidate[i]] for i in range(self.genes)]) [
np.random.permutation(np.arange(self.genes))
for _ in range(self.population)
]
)
def get_distance(self, individual: npt.NDArray) -> float:
"""Get the distance of the circuit that candidate creates.
def fitness(self): Args:
distances = np.array([ self.get_distance(candidate) for candidate in self.candidates ]) individual (npt.NDArray): The circuit to use to calculate the distance.
max_distance = max(distances)
fitness = max_distance - distances
fitness_sum = sum(fitness)
self.fitness_probs: npt.NDArray = fitness / fitness_sum
Returns:
float: The distance of the circuit of the individual.
"""
return np.array(
[self.data[individual[i - 1], individual[i]] for i in range(self.genes)]
).sum()
def fitness(self) -> None:
"""Calculate the fitness of each individual.
Creates a normalized array where individuals with shorter circuits
have a higher fitness.
Returns:
None
"""
distances: npt.NDArray = np.array(
[self.get_distance(individual) for individual in self.individuals]
)
max_distance: float = max(distances)
# invert results so that the shortest distance gets the largest value.
fitness: npt.NDArray = max_distance - distances
# Normalize array.
fitness_sum: float = np.sum(fitness)
## If all individuals are the same, then they have equal probability
if fitness_sum <= 0:
self.fitness_probs = [1.0 / self.population for _ in range(self.population)]
else:
self.fitness_probs = fitness / fitness_sum
self.best_fitness.append(max(self.fitness_probs))
def crossover(
self, parent1: npt.NDArray, parent2: npt.NDArray
) -> Tuple[npt.NDArray, npt.NDArray]:
"""The crossover step when creating a new generation.
Args:
parent1 (npt.NDArray): The first parent to do crossover with.
parent2 (npt.NDArray): The second parent to do crossover with.
Return:
Tuple: The two new individuals for the next generation.
"""
def crossover(self, parent1: npt.NDArray, parent2: npt.NDArray) -> Tuple[npt.NDArray, npt.NDArray]:
if self.crossover_prob < np.random.random(): if self.crossover_prob < np.random.random():
return (parent1, parent2) return (parent1, parent2)
cut: int = np.random.randint(0, self.genes) cut: int = np.random.randint(0, self.genes)
offspring1 = parent1[:cut] offspring1: npt.NDArray = parent1[:cut]
offspring2 = parent2[:cut] offspring2: npt.NDArray = parent2[:cut]
offspring1 = np.concatenate((offspring1, np.array([gene for gene in parent2 if gene not in offspring1]))) # Add the elements not in parent2 as close to in order as possible.
offspring2 = np.concatenate((offspring2, np.array([gene for gene in parent1 if gene not in offspring2]))) offspring1 = np.concatenate(
(offspring1, np.array([gene for gene in parent2 if gene not in offspring1]))
)
# Add the elements not in parent2 as close to in order as possible.
offspring2 = np.concatenate(
(offspring2, np.array([gene for gene in parent1 if gene not in offspring2]))
)
return (offspring1, offspring2) return (offspring1, offspring2)
def mutate(self, individual: npt.NDArray) -> None:
"""The mutation step when creating a new generation.
def mutate(self, offspring): Args:
individual (npt.NDArray): The individual to potentially mutate.
Returns:
None
"""
# Decide whether or not to mutate.
if self.mutation_prob < np.random.random(): if self.mutation_prob < np.random.random():
return return
pos1: int = np.random.randint(0, self.genes) pos1: int = np.random.randint(0, self.genes)
pos2: int = np.random.randint(0, self.genes) pos2: int = np.random.randint(0, self.genes)
offspring[pos1], offspring[pos2] = offspring[pos2], offspring[pos1] individual[[pos1, pos2]] = individual[[pos2, pos1]]
def select_individual(self): def select_individual(self) -> npt.NDArray:
choice = np.random.choice(self.population, 1, p=self.fitness_probs)[0] """Select an individual using the fitness probabilities.
return self.candidates[choice]
Returns:
npt.NDArray: The individual that has been selected.
"""
def generate_next_generation(self): choice: int = np.random.choice(self.population, 1, p=self.fitness_probs)[0]
new_generation = []
return self.individuals[choice]
def generate_next_generation(self) -> None:
"""Create the next generation of individuals.
Returns:
None
"""
new_generation: List = []
self.fitness() self.fitness()
offspring1: npt.NDArray
offspring2: npt.NDArray
# For each individual, create a new individual
for _ in range(0, self.population, 2): for _ in range(0, self.population, 2):
offspring1, offspring2 = self.crossover(self.select_individual(), self.select_individual()) # Select 2 individuals and perform crossover.
offspring1, offspring2 = self.crossover(
self.select_individual(), self.select_individual()
)
self.mutate(offspring1) self.mutate(offspring1)
self.mutate(offspring2) self.mutate(offspring2)
new_generation.append(offspring1) new_generation.append(offspring1)
new_generation.append(offspring2) new_generation.append(offspring2)
self.candidates = np.array(new_generation) self.individuals = np.array(new_generation[: self.population])
def run(self, generations: int = 10) -> Self:
"""Run the genetic algorithm for a certain amount of generations.
Args:
generations (int): the number of generations to run the algorithm.
Returns:
Self: Itself, so that we can use ProcessPoolExecutor.
"""
def run(self, generations = 10):
for _ in range(generations): for _ in range(generations):
self.generate_next_generation() self.generate_next_generation()
def get_candidates(self): return self
return self.candidates
def get_individuals(self) -> npt.NDArray:
"""Get all candidates.
Returns:
npt.NDArray: The array containing each individual.
"""
return self.individuals
def get_best_individual(self) -> Tuple[float, npt.NDArray]:
"""Get the best individual from all the individuals.
Returns:
Tuple[float, npt.NDArray]: A tuple with the distance and permutation
of the best individual.
"""
res = sorted(
[
(self.get_distance(individual), individual)
for individual in self.individuals
],
key=lambda i: i[0],
)
return res[0]
def test_best_params(data: npt.NDArray) -> Tuple[float, float, float]:
population: int = 50
crossover_prob: npt.NDArray = np.linspace(0.1, 1, 10)
mutation_prob: npt.NDArray = np.linspace(0.1, 1, 10)
best_distance: float = float("inf")
best_crossover_prob: float = 0.0
best_mutation_prob: float = 0.0
for c_prob in crossover_prob:
for m_prob in mutation_prob:
np.random.seed(1987)
gen = GeneticTSP(population, c_prob, m_prob, data)
gen.run(100)
tmp = gen.get_best_individual()
if tmp[0] < best_distance:
best_distance = tmp[0]
best_crossover_prob = c_prob
best_mutation_prob = m_prob
return best_distance, best_crossover_prob, best_mutation_prob
if __name__ == "__main__": if __name__ == "__main__":
cities, data = read_data("./european_cities.csv") cities, data = read_data("./european_cities.csv")
np.random.seed(1987) np.random.seed(1987)
gen = GeneticTSP(500, .8, .4, data[:10,:10])
original_cands = gen.get_candidates() # print(f"Finding the best parameters for the mutation and crossover probabilities")
res = [ (gen.get_distance(cand), cand) for cand in original_cands ] # bd, bc, bm = test_best_params(data[:10, :10])
res.sort(key=lambda i: i[0])
print(f"Original population") # print(f"Best distance : {bd}")
print(f"Distance: {res[0][0]}") # print(f"Best crossover probability: {bc}")
plot_plan([ cities[i] for i in res[-1][1] ]) # print(f"Best mutation probability : {bm}\n")
gen.run(500) bd, bc, bm = 0, 0.7, 0.1
populations = [10, 100, 1000]
generations = 300
avg_fitness = []
arr = gen.get_candidates() # 10 cities
res = [ (gen.get_distance(cand), cand) for cand in arr ] print(f"Results for 10 cities.")
res.sort(key=lambda i: i[0]) print(f"generations : {generations}")
print(f"Crossover probability: {bc}")
print(f"Mutation probability : {bm}\n")
print(f"Improved population") for population in populations:
print(f"Distance: {res[0][0]}") arr = [GeneticTSP(population, bc, bm, data[:10, :10]) for _ in range(20)]
plot_plan([ cities[i] for i in res[0][1] ])
t0 = time_ns()
futures = []
with ProcessPoolExecutor() as executor:
for obj in arr:
futures.append(executor.submit(obj.run, generations))
wait(futures)
t1 = time_ns()
t = (t1 - t0) / 1_000_000.0
arr = [future.result() for future in futures]
res = sorted(
list(map(lambda gen: gen.get_best_individual(), arr)),
key=lambda i: i[0],
)
distances = list(map(lambda n: n[0], res))
best = res[0][0]
worst = res[-1][0]
mean = sum(distances) / len(res)
std_dev = np.sqrt(sum([(i - mean) ** 2 for i in distances]) / len(res))
print(f"Results for a population of {population}.")
print(f"time : {t:>12.6f}ms")
print(f"best distance : {best:>12.6f}km")
print(f"worst distance : {worst:>12.6f}km")
print(f"average distance : {mean:>12.6f}km")
print(f"standard deviation: {std_dev:>12.6f}km\n")
fitness = np.array([gen.best_fitness for gen in arr])
avg_fitness.append((fitness.sum(axis=0) / len(arr), f"cities: 10, population: {population}"))
# ax.plot(
# np.arange(len(avg_fitness)),
# avg_fitness,
# label=f"cities: 10, population: {population}",
# )
plot_plan(indexes_to_cities(res[0][1], cities))
# 24 cities
print(f"Results for 24 cities.")
print(f"Crossover probability: {bc}")
print(f"Mutation probability : {bm}\n")
for population in populations:
arr = [GeneticTSP(population, bc, bm, data) for _ in range(20)]
t0 = time_ns()
futures = []
with ProcessPoolExecutor() as executor:
for obj in arr:
futures.append(executor.submit(obj.run, generations))
wait(futures)
t1 = time_ns()
t = (t1 - t0) / 1_000_000.0
arr = [future.result() for future in futures]
res = sorted(
list(map(lambda gen: gen.get_best_individual(), arr)),
key=lambda i: i[0],
)
distances = list(map(lambda n: n[0], res))
best = res[0][0]
worst = res[-1][0]
mean = sum(distances) / len(res)
std_dev = np.sqrt(sum([(i - mean) ** 2 for i in distances]) / len(res))
print(f"Results for a population of {population}.")
print(f"time : {t:>12.6f}ms")
print(f"best distance : {best:>12.6f}km")
print(f"worst distance : {worst:>12.6f}km")
print(f"average distance : {mean:>12.6f}km")
print(f"standard deviation: {std_dev:>12.6f}km\n")
fitness = np.array([gen.best_fitness for gen in arr])
avg_fitness.append((fitness.sum(axis=0) / len(arr), f"cities: 10, population: {population}"))
# ax.plot(
# np.arange(len(avg_fitness)),
# avg_fitness,
# label=f"cities: 24, population: {population}",
# )
plot_plan(indexes_to_cities(res[0][1], cities))
# Plot the average best fitnesses
fig, ax = plt.subplots()
x = np.arange(len(avg_fitness[0][0]))
for element in avg_fitness:
ax.plot(x, element[0], label=element[1])
ax.set_xlabel("generations")
ax.set_ylabel("avg best fitness")
fig.legend()
fig.savefig("./images/average_fitness.png")
"""Running example
oblig1 on main [!?] via 🐍 v3.12.6 took 4m37s
python genetic_algorithm.py
Results for 10 cities.
generations : 300
Crossover probability: 0.7
Mutation probability : 0.1
Results for a population of 10.
time : 717.914256ms
best distance : 7486.310000km
worst distance : 8737.340000km
average distance : 7634.319500km
standard deviation: 283.926327km
Results for a population of 100.
time : 6922.343884ms
best distance : 7486.310000km
worst distance : 7830.010000km
average distance : 7529.078000km
standard deviation: 88.041417km
Results for a population of 1000.
time : 99066.816177ms
best distance : 7486.310000km
worst distance : 7549.160000km
average distance : 7495.113500km
standard deviation: 18.968296km
Results for 24 cities.
Crossover probability: 0.7
Mutation probability : 0.1
Results for a population of 10.
time : 1441.588712ms
best distance : 15921.410000km
worst distance : 20250.840000km
average distance : 18045.991500km
standard deviation: 1060.673071km
Results for a population of 100.
time : 15143.475494ms
best distance : 13148.120000km
worst distance : 17453.060000km
average distance : 15048.892000km
standard deviation: 1083.912052km
Results for a population of 1000.
time : 151313.539435ms
best distance : 12890.050000km
worst distance : 15798.380000km
average distance : 14121.351500km
standard deviation: 924.716247km
"""

View File

@ -59,8 +59,29 @@ def hill_climbing(distances: npt.NDArray) -> Tuple[float, npt.NDArray]:
return (current_distance, perm) return (current_distance, perm)
def test_hill_climbing(data: npt.NDArray, cities: npt.NDArray, runs: int):
res = [hill_climbing(data) for _ in range(runs)]
res.sort(key=lambda n: n[0])
distances = list(map(lambda n: n[0], res))
best = res[0][0]
worst = res[-1][0]
mean = sum(distances) / runs
std_dev = np.sqrt(sum([(i - mean)**2 for i in distances]) / runs)
print(f"Hill climbing for {len(data)} cities.")
print(f"best distance : {best:>12.6f}km")
print(f"worst distance : {worst:>12.6f}km")
print(f"average distance : {mean:>12.6f}km")
print(f"standard deviation: {std_dev:>12.6f}km\n")
plot_plan(indexes_to_cities(res[0][1], cities)) # Plot the best one
if __name__ == "__main__": if __name__ == "__main__":
np.random.seed(1987)
cities, data = read_data("./european_cities.csv") cities, data = read_data("./european_cities.csv")
distance, perm = hill_climbing(data[:10, :10]) distance, perm = hill_climbing(data[:10, :10])
plot_plan(indexes_to_cities(perm, cities)) # plot_plan(indexes_to_cities(perm, cities))
test_hill_climbing(data[:10,:10], cities, 20)
test_hill_climbing(data, cities, 20)