from __future__ import annotations
import time
from typing import List, Dict, Any
from networkx import Graph
from pulp import LpVariable, LpProblem, LpMinimize, LpInteger, lpSum, LpStatus, LpStatusInfeasible, CPLEX_CMD
from Loggable import Loggable
from utils import config
from exceptions.InfeasibleSolutionException import InfeasibleSolutionException
[docs]
class KLabeling(Loggable):
def __init__(self, g: Graph, layers: int = 1):
super().__init__()
self.g = g
self.layers = layers
self.objective = -1
self.labeling = dict()
self.start_time = None
self.end_time = None
[docs]
def get_log(self) -> List[Dict[str, Any]] | Dict[str, Any]:
layer_to_node_count = dict()
for node, layers in self.labeling[2].items():
for layer in layers:
if layer not in layer_to_node_count:
layer_to_node_count[layer] = 0
layer_to_node_count[layer] += 1
return {
"labeling_method": self.__class__.__name__,
"solution": {
"objective": self.objective,
"layers": layer_to_node_count,
}
}
[docs]
def label_alt(self):
print("Number of nodes: {}".format(len(self.g.nodes)))
print("Number of edges: {}".format(len(self.g.edges)))
print("Layers: {}".format(self.layers))
self.start_time = time.time()
solver = CPLEX_CMD(path=config.cplex_path, msg=False, keepFiles=config.keep_files, timeLimit=config.time_limit)
cmbs = [(i, i + 1) for i in range(self.layers)]
cmbs.extend([(i + 1, i) for i in range(self.layers)])
bounds = ["l", "u"]
# Variables
x_vars = LpVariable.dicts("x", (self.g.nodes, bounds), 1, self.layers + 1, cat=LpInteger)
s_vars = LpVariable.dicts("s", (self.g.edges, cmbs), 0, 1, LpInteger)
d_vars = LpVariable.dicts("d", self.g.nodes, cat=LpInteger)
# The semiperimeter
S = LpVariable("S", 0, cat=LpInteger)
lpvc = LpProblem("VC", LpMinimize)
if config.objective == "semi":
lpvc += S
else:
lpvc += 1
lpvc += lpSum(d_vars) == S
M = 10000
for e in self.g.edges:
for l in [i for i in range(self.layers)]:
lpvc += x_vars[e[0]]["l"] <= (l + 1) * (M * (1 - s_vars[e][(l, l + 1)]) + 1)
lpvc += x_vars[e[0]]["u"] >= (l + 1) * s_vars[e][(l, l + 1)]
lpvc += x_vars[e[1]]["l"] <= (l + 2) * (M * (1 - s_vars[e][(l, l + 1)]) + 1)
lpvc += x_vars[e[1]]["u"] >= (l + 2) * s_vars[e][(l, l + 1)]
lpvc += x_vars[e[0]]["l"] <= (l + 2) * (M * (1 - s_vars[e][(l + 1, l)]) + 1)
lpvc += x_vars[e[0]]["u"] >= (l + 2) * s_vars[e][(l + 1, l)]
lpvc += x_vars[e[1]]["l"] <= (l + 1) * (M * (1 - s_vars[e][(l + 1, l)]) + 1)
lpvc += x_vars[e[1]]["u"] >= (l + 1) * s_vars[e][(l + 1, l)]
lpvc += lpSum(s_vars[e]) == 1
for v in self.g.nodes:
lpvc += d_vars[v] == x_vars[v]["u"] - x_vars[v]["l"] + 1
lpvc += x_vars[v]["u"] >= x_vars[v]["l"]
# for l in range(self.layers + 1):
# if l % 2 == 0:
# lpvc += lpSum([x_vars[v][l] for v in self.g.nodes]) <= config.max_rows
# else:
# lpvc += lpSum([x_vars[v][l] for v in self.g.nodes]) <= config.max_columns
# Required constraint: root node and leaf node must be given a label V
if config.io_constraints:
for (v, d) in self.g.nodes(data=True):
if d["root"]:
if self.layers % 2 == 0:
lpvc += x_vars[v]["u"] == self.layers
else:
lpvc += x_vars[v]["u"] == self.layers - 1
if d["terminal"]:
lpvc += x_vars[v]["l"] == 0
lpvc.solve(solver)
if lpvc.status == LpStatusInfeasible:
raise InfeasibleSolutionException("Infeasible solution.")
self.end_time = time.time()
self.log += 'ILP time (s): {}\n'.format(self.end_time - self.start_time)
# for v in self.g.nodes:
# for l in range(self.layers + 1):
# print("v_{}_{} = {}".format(v, l, x_vars[v][l].varValue))
rows = 0
columns = 0
bucket = [0 for l in range(self.layers + 1)]
for v in self.g.nodes:
lower = int(round(x_vars[v]["l"].varValue))
upper = int(round(x_vars[v]["u"].varValue))
print("{}: {} {}".format(v, lower, upper))
for i in range(lower, upper + 1):
bucket[i - 1] += 1
for l in range(self.layers + 1):
s = bucket[l]
print("Layer {}: {}".format(l, s))
self.log += 'Layer {}: {}\n'.format(l, s)
if l % 2 == 0 and s > rows:
rows = s
elif l % 2 == 1 and s > columns:
columns = s
self.log += 'Rows: {}\n'.format(rows)
self.log += 'Columns: {}\n'.format(columns)
edge_assignments = dict()
for e in self.g.edges:
for l in range(self.layers):
if int(round(s_vars[e][(l, l + 1)].varValue)) == 1:
edge_assignments[e] = (l, l + 1)
if int(round(s_vars[e][(l + 1, l)].varValue)) == 1:
edge_assignments[e] = (l + 1, l)
node_assignments = dict()
for v in self.g.nodes:
node_assignments[v] = []
for v in self.g.nodes:
lower = int(round(x_vars[v]["l"].varValue))
upper = int(round(x_vars[v]["u"].varValue))
for i in range(lower, upper + 1):
node_assignments[v].append(i - 1)
self.labeling = (rows, columns, node_assignments, edge_assignments)
# if config.verbose:
print("Status:", LpStatus[lpvc.status])
print("Sum = " + str(S.varValue))
# print('Node assignments: {}\n'.format(node_assignments))
# print('Edge assignments: {}\n'.format(edge_assignments))
config.log.add(self.get_log())
return self.labeling
[docs]
def label(self):
print("Number of nodes: {}".format(len(self.g.nodes)))
print("Number of edges: {}".format(len(self.g.edges)))
print("Layers: {}".format(self.layers))
self.start_time = time.time()
solver = CPLEX_CMD(path=config.cplex_path, msg=False, keepFiles=config.keep_files, timeLimit=config.time_limit,
logPath=str(config.root.joinpath("cplex.log")))
cmbs = [(i, i + 1) for i in range(self.layers)]
cmbs.extend([(i + 1, i) for i in range(self.layers)])
# Variables
x_vars = LpVariable.dicts("x", (self.g.nodes, [i for i in range(self.layers + 1)]), 0, 1, LpInteger)
s_vars = LpVariable.dicts("s", (self.g.edges, cmbs), 0, 1, LpInteger)
d_vars = LpVariable.dicts("d", self.g.nodes, cat=LpInteger)
r = LpVariable("r", cat=LpInteger)
c = LpVariable("c", cat=LpInteger)
# The semiperimeter
S = LpVariable("S", 0, cat=LpInteger)
lpvc = LpProblem("VC", LpMinimize)
if config.objective == "semi":
lpvc += S
else:
lpvc += 1
lpvc += S == r + c
for e in self.g.edges:
for l in [i for i in range(self.layers)]:
lpvc += lpSum(x_vars[e[0]][l] + x_vars[e[1]][l + 1]) >= 2 * s_vars[e][(l, l + 1)]
lpvc += lpSum(x_vars[e[0]][l + 1] + x_vars[e[1]][l]) >= 2 * s_vars[e][(l + 1, l)]
lpvc += lpSum(s_vars[e]) == 1
for v in self.g.nodes:
lpvc += lpSum(x_vars[v]) == d_vars[v]
for l1 in range(self.layers):
for l2 in range(l1 + 1, self.layers + 1):
lpvc += 10000 * (1 - (x_vars[v][l1] + x_vars[v][l2] - 1)) + d_vars[v] >= l2 - l1 + 1
for l in range(self.layers + 1):
if l % 2 == 0:
lpvc += lpSum([x_vars[v][l] for v in self.g.nodes]) <= r
lpvc += lpSum([x_vars[v][l] for v in self.g.nodes]) <= config.max_rows
else:
lpvc += lpSum([x_vars[v][l] for v in self.g.nodes]) <= c
lpvc += lpSum([x_vars[v][l] for v in self.g.nodes]) <= config.max_columns
# Required constraint: root node and leaf node must be given a label V
if config.io_constraints:
for (v, d) in self.g.nodes(data=True):
if d["root"]:
if config.output_layer is not None:
lpvc += x_vars[v][config.output_layer] == 1
else:
if self.layers % 2 == 0:
lpvc += x_vars[v][self.layers] == 1
else:
lpvc += x_vars[v][self.layers - 1] == 1
elif d["terminal"]:
if config.input_layer is not None:
lpvc += x_vars[v][config.input_layer] == 1
else:
lpvc += x_vars[v][0] == 1
# lpvc.writeLP('ilp.lp')
lpvc.solve(solver)
if lpvc.status == LpStatusInfeasible:
raise InfeasibleSolutionException("Infeasible solution.")
self.end_time = time.time()
print("Status: ", LpStatus[lpvc.status])
print("Objective: " + str(S.varValue))
self.objective = S.varValue
rows = 0
columns = 0
for l in range(self.layers + 1):
s = sum([int(round(x_vars[v][l].varValue)) for v in self.g.nodes])
print("Layer {}: {}".format(l, s))
if l % 2 == 0 and s > rows:
rows = s
elif l % 2 == 1 and s > columns:
columns = s
edge_assignments = dict()
for e in self.g.edges:
for l in range(self.layers):
if int(round(s_vars[e][(l, l + 1)].varValue)) == 1:
edge_assignments[e] = (l, l + 1)
if int(round(s_vars[e][(l + 1, l)].varValue)) == 1:
edge_assignments[e] = (l + 1, l)
node_assignments = dict()
for v in self.g.nodes:
node_assignments[v] = []
for v in self.g.nodes:
for l in range(self.layers + 1):
if int(round(x_vars[v][l].varValue)) == 1:
node_assignments[v].append(l)
self.labeling = (rows, columns, node_assignments, edge_assignments)
# gap = 0
# cplex_log_file_name = config.root.joinpath("cplex.log")
# if cplex_log_file_name.is_file():
# with open(str(cplex_log_file_name), 'r') as f:
# for line in f.readlines():
# if "gap" in line:
# gap = float(re.findall(r'(\d+\.\d+)\%', line)[0])
# config.log.add("Gap (%): {}\n".format(gap))
return self.labeling