Compare commits

..

No commits in common. "d21e448e4a7633fcab450b93b73881051dfced2c" and "b47a1a495547680652e417766a480d495f41cb52" have entirely different histories.

4 changed files with 69 additions and 432 deletions

View File

@ -1,13 +1,3 @@
# IN4050: Obligatory Assignment 1 # IN4050: Obligatory Assignment 1
[Project repository](https://gitea.balaton.dev/IN4050/in4050-oblig1) [Project repository](https://gitea.balaton.dev/IN4050/in4050-oblig1)
Email: [coryab@uio.no](mailto:coryab@uio.no)
## How to run the programs
To run any of the programs, just use this command:
```bash
python <exhaustive_search \| hill_climbing \| genetic_algorithm>.py
```

View File

@ -1,12 +1,11 @@
import time import time
from itertools import permutations from itertools import permutations
from typing import Tuple from typing import Tuple
from math import factorial
import numpy as np import numpy as np
import numpy.typing as npt import numpy.typing as npt
from common import indexes_to_cities, plot_plan, read_data from common import plot_plan, read_data
def exhaustive_search(distances: npt.NDArray) -> Tuple[float, npt.NDArray]: def exhaustive_search(distances: npt.NDArray) -> Tuple[float, npt.NDArray]:
@ -35,17 +34,14 @@ def exhaustive_search(distances: npt.NDArray) -> Tuple[float, npt.NDArray]:
), ),
permutations(range(size)), permutations(range(size)),
), ),
key=lambda x: x[0] # Make sure that it finds the minimal distance
) )
if __name__ == "__main__": if __name__ == "__main__":
cities, data = read_data("./european_cities.csv") cities, data = read_data("./european_cities.csv")
times = {}
# A loop timing finding the optimal solution for different n # A loop timing finding the optimal solution for different n
for n in range(6,11): for n in range(6, 11):
# Time exhaustive search # Time exhaustive search
t0 = time.time_ns() t0 = time.time_ns()
distance, perm = exhaustive_search(data[:n, :n]) distance, perm = exhaustive_search(data[:n, :n])
@ -53,32 +49,32 @@ if __name__ == "__main__":
time_elapsed_ms = (t1 - t0) / 1_000_000.0 time_elapsed_ms = (t1 - t0) / 1_000_000.0
times[n] = time_elapsed_ms, distance
if n in (6,10):
city_seq = indexes_to_cities(perm, cities)
plot_plan(city_seq)
print(f"Sequence for {n} cities: {city_seq}")
print("")
for n, (time, distance) in times.items():
print(f"Exhaustive search for the {n} first cities:") print(f"Exhaustive search for the {n} first cities:")
print(f"{'distance':<25}: {distance:>12.6f}km") print(f"distance : {distance:>12.6f}km")
print(f"{'time to find solution':<25}: {time:>12.6f}ms") print(f"time to find solution: {time_elapsed_ms:>12.6f}ms\n")
print(f"{f'time / {n}!':<25}: {time / factorial(n):>12.6f}\n")
"""Running example """Running example
oblig1 on main [!] via 🐍 v3.12.6 took 14s oblig1 on main [?] via 🐍 v3.12.6 took 7s
python exhaustive_search.py python exhaustive_search.py
Exhaustive search for the 6 first cities: Exhaustive search for the 6 first cities:
distance : 5018.810000km distance : 5018.810000km
time to find solution : 1.485208ms time to find solution: 1.105330ms
time / 6! : 0.002063
Exhaustive search for the 7 first cities:
distance : 5487.890000km
time to find solution: 10.089604ms
Exhaustive search for the 8 first cities:
distance : 6667.490000km
time to find solution: 78.810508ms
Exhaustive search for the 9 first cities:
distance : 6678.550000km
time to find solution: 765.676230ms
Exhaustive search for the 10 first cities: Exhaustive search for the 10 first cities:
distance : 7486.310000km distance : 7486.310000km
time to find solution : 10980.900480ms time to find solution: 8281.795515ms
time / 10! : 0.003026
""" """

View File

@ -1,438 +1,110 @@
from concurrent.futures import ProcessPoolExecutor, wait import random
from concurrent.futures.thread import ThreadPoolExecutor from typing import Tuple
from time import time_ns
from typing import List, Self, Tuple
import matplotlib.pyplot as plt
import numpy as np import numpy as np
import numpy.typing as npt import numpy.typing as npt
from common import indexes_to_cities, plot_plan, read_data from common import plot_plan, read_data
class GeneticTSP: class GeneticTSP:
"""A class for solving the travelling salesman problem using a genetic algorithm."""
def __init__(
self,
population: int,
crossover_prob: float,
mutation_prob: float,
data: npt.NDArray,
) -> None:
"""The init method of GeneticTSP.
Args:
population (int): The size of the population for each generation.
crossover_prob (float): The probability of crossover happening.
mutation_prob (float): The probability of a mutation happening.
data (npt.NDarray): An NxN array containing the distances between cities.
Returns:
None
"""
def __init__(self, population: int, crossover_prob: float, mutation_prob: float, data):
self.generation: int = 0 self.generation: int = 0
self.population: int = population self.population: int = population
self.data: npt.NDArray = data self.data: npt.NDArray = data
self.genes: int = len(data) self.genes: int = len(data)
self.crossover_prob: float = crossover_prob self.crossover_prob: float = crossover_prob
self.mutation_prob: float = mutation_prob self.mutation_prob: float = mutation_prob
self.best_fitness = []
self.generate_first_generation() self.generate_first_generation()
def generate_first_generation(self) -> None: def generate_first_generation(self):
"""Generate the first generation of n random permutations (individuals). self.candidates: npt.NDArray = np.array([np.random.permutation(np.arange(self.genes)) for _ in range(self.population)])
Returns:
None
"""
self.individuals: npt.NDArray = np.array( def get_distance(self, candidate):
[ return sum([self.data[candidate[i - 1], candidate[i]] for i in range(self.genes)])
np.random.permutation(np.arange(self.genes))
for _ in range(self.population)
]
)
def get_distance(self, individual: npt.NDArray) -> float:
"""Get the distance of the circuit that candidate creates.
Args: def fitness(self):
individual (npt.NDArray): The circuit to use to calculate the distance. distances = np.array([ self.get_distance(candidate) for candidate in self.candidates ])
max_distance = max(distances)
fitness = max_distance - distances
fitness_sum = sum(fitness)
self.fitness_probs: npt.NDArray = fitness / fitness_sum
Returns:
float: The distance of the circuit of the individual.
"""
return np.array(
[self.data[individual[i - 1], individual[i]] for i in range(self.genes)]
).sum()
def fitness(self) -> None:
"""Calculate the fitness of each individual.
Creates a normalized array where individuals with shorter circuits
have a higher fitness.
Returns:
None
"""
distances: npt.NDArray = np.array(
[self.get_distance(individual) for individual in self.individuals]
)
max_distance: float = max(distances)
# invert results so that the shortest distance gets the largest value.
fitness: npt.NDArray = max_distance - distances
# Normalize array.
fitness_sum: float = np.sum(fitness)
## If all individuals are the same, then they have equal probability
if fitness_sum <= 0:
self.fitness_probs = [1.0 / self.population for _ in range(self.population)]
else:
self.fitness_probs = fitness / fitness_sum
self.best_fitness.append(max(self.fitness_probs))
def crossover(
self, parent1: npt.NDArray, parent2: npt.NDArray
) -> Tuple[npt.NDArray, npt.NDArray]:
"""The crossover step when creating a new generation.
Args:
parent1 (npt.NDArray): The first parent to do crossover with.
parent2 (npt.NDArray): The second parent to do crossover with.
Return:
Tuple: The two new individuals for the next generation.
"""
def crossover(self, parent1: npt.NDArray, parent2: npt.NDArray) -> Tuple[npt.NDArray, npt.NDArray]:
if self.crossover_prob < np.random.random(): if self.crossover_prob < np.random.random():
return (parent1, parent2) return (parent1, parent2)
cut: int = np.random.randint(0, self.genes) cut: int = np.random.randint(0, self.genes)
offspring1: npt.NDArray = parent1[:cut] offspring1 = parent1[:cut]
offspring2: npt.NDArray = parent2[:cut] offspring2 = parent2[:cut]
# Add the elements not in parent2 as close to in order as possible. offspring1 = np.concatenate((offspring1, np.array([gene for gene in parent2 if gene not in offspring1])))
offspring1 = np.concatenate( offspring2 = np.concatenate((offspring2, np.array([gene for gene in parent1 if gene not in offspring2])))
(offspring1, np.array([gene for gene in parent2 if gene not in offspring1]))
)
# Add the elements not in parent2 as close to in order as possible.
offspring2 = np.concatenate(
(offspring2, np.array([gene for gene in parent1 if gene not in offspring2]))
)
return (offspring1, offspring2) return (offspring1, offspring2)
def mutate(self, individual: npt.NDArray) -> None:
"""The mutation step when creating a new generation.
Args: def mutate(self, offspring):
individual (npt.NDArray): The individual to potentially mutate.
Returns:
None
"""
# Decide whether or not to mutate.
if self.mutation_prob < np.random.random(): if self.mutation_prob < np.random.random():
return return
pos1: int = np.random.randint(0, self.genes) pos1: int = np.random.randint(0, self.genes)
pos2: int = np.random.randint(0, self.genes) pos2: int = np.random.randint(0, self.genes)
individual[[pos1, pos2]] = individual[[pos2, pos1]] offspring[pos1], offspring[pos2] = offspring[pos2], offspring[pos1]
def select_individual(self) -> npt.NDArray: def select_individual(self):
"""Select an individual using the fitness probabilities. choice = np.random.choice(self.population, 1, p=self.fitness_probs)[0]
return self.candidates[choice]
Returns:
npt.NDArray: The individual that has been selected.
"""
choice: int = np.random.choice(self.population, 1, p=self.fitness_probs)[0] def generate_next_generation(self):
new_generation = []
return self.individuals[choice]
def generate_next_generation(self) -> None:
"""Create the next generation of individuals.
Returns:
None
"""
new_generation: List = []
self.fitness() self.fitness()
offspring1: npt.NDArray for _ in range(0,self.population,2):
offspring2: npt.NDArray offspring1, offspring2 = self.crossover(self.select_individual(), self.select_individual())
# For each individual, create a new individual
for _ in range(0, self.population, 2):
# Select 2 individuals and perform crossover.
offspring1, offspring2 = self.crossover(
self.select_individual(), self.select_individual()
)
self.mutate(offspring1) self.mutate(offspring1)
self.mutate(offspring2) self.mutate(offspring2)
new_generation.append(offspring1) new_generation.append(offspring1)
new_generation.append(offspring2) new_generation.append(offspring2)
self.individuals = np.array(new_generation[: self.population]) self.candidates = np.array(new_generation)
def run(self, generations: int = 10) -> Self:
"""Run the genetic algorithm for a certain amount of generations.
Args:
generations (int): the number of generations to run the algorithm.
Returns:
Self: Itself, so that we can use ProcessPoolExecutor.
"""
def run(self, generations = 10):
for _ in range(generations): for _ in range(generations):
self.generate_next_generation() self.generate_next_generation()
return self def get_candidates(self):
return self.candidates
def get_individuals(self) -> npt.NDArray:
"""Get all candidates.
Returns:
npt.NDArray: The array containing each individual.
"""
return self.individuals
def get_best_individual(self) -> Tuple[float, npt.NDArray]:
"""Get the best individual from all the individuals.
Returns:
Tuple[float, npt.NDArray]: A tuple with the distance and permutation
of the best individual.
"""
res = sorted(
[
(self.get_distance(individual), individual)
for individual in self.individuals
],
key=lambda i: i[0],
)
return res[0]
def test_best_params(data: npt.NDArray) -> Tuple[float, float, float]:
population: int = 50
crossover_prob: npt.NDArray = np.linspace(0.1, 1, 10)
mutation_prob: npt.NDArray = np.linspace(0.1, 1, 10)
best_distance: float = float("inf")
best_crossover_prob: float = 0.0
best_mutation_prob: float = 0.0
for c_prob in crossover_prob:
for m_prob in mutation_prob:
np.random.seed(1987)
gen = GeneticTSP(population, c_prob, m_prob, data)
gen.run(100)
tmp = gen.get_best_individual()
if tmp[0] < best_distance:
best_distance = tmp[0]
best_crossover_prob = c_prob
best_mutation_prob = m_prob
return best_distance, best_crossover_prob, best_mutation_prob
if __name__ == "__main__": if __name__ == "__main__":
cities, data = read_data("./european_cities.csv") cities, data = read_data("./european_cities.csv")
np.random.seed(1987) np.random.seed(1987)
gen = GeneticTSP(500, .8, .4, data[:10,:10])
# print(f"Finding the best parameters for the mutation and crossover probabilities") original_cands = gen.get_candidates()
# bd, bc, bm = test_best_params(data[:10, :10]) res = [ (gen.get_distance(cand), cand) for cand in original_cands ]
res.sort(key=lambda i: i[0])
# print(f"Best distance : {bd}") print(f"Original population")
# print(f"Best crossover probability: {bc}") print(f"Distance: {res[0][0]}")
# print(f"Best mutation probability : {bm}\n") plot_plan([ cities[i] for i in res[-1][1] ])
bd, bc, bm = 0, 0.7, 0.1 gen.run(500)
populations = [10, 100, 1000]
generations = 300
avg_fitness = []
# 10 cities arr = gen.get_candidates()
print(f"Results for 10 cities.") res = [ (gen.get_distance(cand), cand) for cand in arr ]
print(f"generations : {generations}") res.sort(key=lambda i: i[0])
print(f"Crossover probability: {bc}")
print(f"Mutation probability : {bm}\n")
for population in populations: print(f"Improved population")
arr = [GeneticTSP(population, bc, bm, data[:10, :10]) for _ in range(20)] print(f"Distance: {res[0][0]}")
plot_plan([ cities[i] for i in res[0][1] ])
t0 = time_ns()
futures = []
with ProcessPoolExecutor() as executor:
for obj in arr:
futures.append(executor.submit(obj.run, generations))
wait(futures)
t1 = time_ns()
t = (t1 - t0) / 1_000_000.0
arr = [future.result() for future in futures]
res = sorted(
list(map(lambda gen: gen.get_best_individual(), arr)),
key=lambda i: i[0],
)
distances = list(map(lambda n: n[0], res))
best = res[0][0]
worst = res[-1][0]
mean = sum(distances) / len(res)
std_dev = np.sqrt(sum([(i - mean) ** 2 for i in distances]) / len(res))
print(f"Results for a population of {population}.")
print(f"time : {t:>12.6f}ms")
print(f"best distance : {best:>12.6f}km")
print(f"worst distance : {worst:>12.6f}km")
print(f"average distance : {mean:>12.6f}km")
print(f"standard deviation: {std_dev:>12.6f}km\n")
fitness = np.array([gen.best_fitness for gen in arr])
avg_fitness.append((fitness.sum(axis=0) / len(arr), f"cities: 10, population: {population}"))
# ax.plot(
# np.arange(len(avg_fitness)),
# avg_fitness,
# label=f"cities: 10, population: {population}",
# )
plot_plan(indexes_to_cities(res[0][1], cities))
# 24 cities
print(f"Results for 24 cities.")
print(f"Crossover probability: {bc}")
print(f"Mutation probability : {bm}\n")
for population in populations:
arr = [GeneticTSP(population, bc, bm, data) for _ in range(20)]
t0 = time_ns()
futures = []
with ProcessPoolExecutor() as executor:
for obj in arr:
futures.append(executor.submit(obj.run, generations))
wait(futures)
t1 = time_ns()
t = (t1 - t0) / 1_000_000.0
arr = [future.result() for future in futures]
res = sorted(
list(map(lambda gen: gen.get_best_individual(), arr)),
key=lambda i: i[0],
)
distances = list(map(lambda n: n[0], res))
best = res[0][0]
worst = res[-1][0]
mean = sum(distances) / len(res)
std_dev = np.sqrt(sum([(i - mean) ** 2 for i in distances]) / len(res))
print(f"Results for a population of {population}.")
print(f"time : {t:>12.6f}ms")
print(f"best distance : {best:>12.6f}km")
print(f"worst distance : {worst:>12.6f}km")
print(f"average distance : {mean:>12.6f}km")
print(f"standard deviation: {std_dev:>12.6f}km\n")
fitness = np.array([gen.best_fitness for gen in arr])
avg_fitness.append((fitness.sum(axis=0) / len(arr), f"cities: 10, population: {population}"))
# ax.plot(
# np.arange(len(avg_fitness)),
# avg_fitness,
# label=f"cities: 24, population: {population}",
# )
plot_plan(indexes_to_cities(res[0][1], cities))
# Plot the average best fitnesses
fig, ax = plt.subplots()
x = np.arange(len(avg_fitness[0][0]))
for element in avg_fitness:
ax.plot(x, element[0], label=element[1])
ax.set_xlabel("generations")
ax.set_ylabel("avg best fitness")
fig.legend()
fig.savefig("./images/average_fitness.png")
"""Running example
oblig1 on main [!?] via 🐍 v3.12.6 took 4m37s
python genetic_algorithm.py
Results for 10 cities.
generations : 300
Crossover probability: 0.7
Mutation probability : 0.1
Results for a population of 10.
time : 717.914256ms
best distance : 7486.310000km
worst distance : 8737.340000km
average distance : 7634.319500km
standard deviation: 283.926327km
Results for a population of 100.
time : 6922.343884ms
best distance : 7486.310000km
worst distance : 7830.010000km
average distance : 7529.078000km
standard deviation: 88.041417km
Results for a population of 1000.
time : 99066.816177ms
best distance : 7486.310000km
worst distance : 7549.160000km
average distance : 7495.113500km
standard deviation: 18.968296km
Results for 24 cities.
Crossover probability: 0.7
Mutation probability : 0.1
Results for a population of 10.
time : 1441.588712ms
best distance : 15921.410000km
worst distance : 20250.840000km
average distance : 18045.991500km
standard deviation: 1060.673071km
Results for a population of 100.
time : 15143.475494ms
best distance : 13148.120000km
worst distance : 17453.060000km
average distance : 15048.892000km
standard deviation: 1083.912052km
Results for a population of 1000.
time : 151313.539435ms
best distance : 12890.050000km
worst distance : 15798.380000km
average distance : 14121.351500km
standard deviation: 924.716247km
"""

View File

@ -59,29 +59,8 @@ def hill_climbing(distances: npt.NDArray) -> Tuple[float, npt.NDArray]:
return (current_distance, perm) return (current_distance, perm)
def test_hill_climbing(data: npt.NDArray, cities: npt.NDArray, runs: int):
res = [hill_climbing(data) for _ in range(runs)]
res.sort(key=lambda n: n[0])
distances = list(map(lambda n: n[0], res))
best = res[0][0]
worst = res[-1][0]
mean = sum(distances) / runs
std_dev = np.sqrt(sum([(i - mean)**2 for i in distances]) / runs)
print(f"Hill climbing for {len(data)} cities.")
print(f"best distance : {best:>12.6f}km")
print(f"worst distance : {worst:>12.6f}km")
print(f"average distance : {mean:>12.6f}km")
print(f"standard deviation: {std_dev:>12.6f}km\n")
plot_plan(indexes_to_cities(res[0][1], cities)) # Plot the best one
if __name__ == "__main__": if __name__ == "__main__":
np.random.seed(1987)
cities, data = read_data("./european_cities.csv") cities, data = read_data("./european_cities.csv")
distance, perm = hill_climbing(data[:10, :10]) distance, perm = hill_climbing(data[:10, :10])
# plot_plan(indexes_to_cities(perm, cities)) plot_plan(indexes_to_cities(perm, cities))
test_hill_climbing(data[:10,:10], cities, 20)
test_hill_climbing(data, cities, 20)