From 19bff003aa1c365bec86d3f4a13a9c3d6c36d230 Mon Sep 17 00:00:00 2001 From: Christian Clauss Date: Fri, 28 Oct 2022 15:54:54 +0200 Subject: [PATCH] Adopt Python >= 3.8 assignment expressions using auto-walrus (#7737) * Adopt Python >= 3.8 assignment expressions using auto-walrus * updating DIRECTORY.md * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci Co-authored-by: github-actions <${GITHUB_ACTOR}@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .pre-commit-config.yaml | 5 + DIRECTORY.md | 3 + ciphers/enigma_machine2.py | 3 +- .../linked_list/doubly_linked_list_two.py | 3 +- dynamic_programming/fibonacci.py | 3 +- .../sequential_minimum_optimization.py | 1261 ++++++++--------- strings/indian_phone_validator.py | 3 +- 7 files changed, 642 insertions(+), 639 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5bdda50be..7f6c206b4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,6 +13,11 @@ repos: )$ - id: requirements-txt-fixer + - repo: https://github.com/MarcoGorelli/auto-walrus + rev: v0.2.1 + hooks: + - id: auto-walrus + - repo: https://github.com/psf/black rev: 22.10.0 hooks: diff --git a/DIRECTORY.md b/DIRECTORY.md index ba7d3e62a..7621427a6 100644 --- a/DIRECTORY.md +++ b/DIRECTORY.md @@ -45,6 +45,7 @@ * [Count 1S Brian Kernighan Method](bit_manipulation/count_1s_brian_kernighan_method.py) * [Count Number Of One Bits](bit_manipulation/count_number_of_one_bits.py) * [Gray Code Sequence](bit_manipulation/gray_code_sequence.py) + * [Highest Set Bit](bit_manipulation/highest_set_bit.py) * [Is Even](bit_manipulation/is_even.py) * [Reverse Bits](bit_manipulation/reverse_bits.py) * [Single Bit Manipulation Operations](bit_manipulation/single_bit_manipulation_operations.py) @@ -326,6 +327,7 @@ ## Financial * [Equated Monthly Installments](financial/equated_monthly_installments.py) * [Interest](financial/interest.py) + * [Price Plus Tax](financial/price_plus_tax.py) ## Fractals * [Julia Sets](fractals/julia_sets.py) @@ -669,6 +671,7 @@ * [Horizontal Projectile Motion](physics/horizontal_projectile_motion.py) * [Kinetic Energy](physics/kinetic_energy.py) * [Lorentz Transformation Four Vector](physics/lorentz_transformation_four_vector.py) + * [Malus Law](physics/malus_law.py) * [N Body Simulation](physics/n_body_simulation.py) * [Newtons Law Of Gravitation](physics/newtons_law_of_gravitation.py) * [Newtons Second Law Of Motion](physics/newtons_second_law_of_motion.py) diff --git a/ciphers/enigma_machine2.py b/ciphers/enigma_machine2.py index 9f9dbe6f7..a877256eb 100644 --- a/ciphers/enigma_machine2.py +++ b/ciphers/enigma_machine2.py @@ -86,8 +86,7 @@ def _validator( """ # Checks if there are 3 unique rotors - unique_rotsel = len(set(rotsel)) - if unique_rotsel < 3: + if (unique_rotsel := len(set(rotsel))) < 3: raise Exception(f"Please use 3 unique rotors (not {unique_rotsel})") # Checks if rotor positions are valid diff --git a/data_structures/linked_list/doubly_linked_list_two.py b/data_structures/linked_list/doubly_linked_list_two.py index 184b6966b..94b916a62 100644 --- a/data_structures/linked_list/doubly_linked_list_two.py +++ b/data_structures/linked_list/doubly_linked_list_two.py @@ -143,9 +143,8 @@ class LinkedList: raise Exception("Node not found") def delete_value(self, value): - node = self.get_node(value) - if node is not None: + if (node := self.get_node(value)) is not None: if node == self.head: self.head = self.head.get_next() diff --git a/dynamic_programming/fibonacci.py b/dynamic_programming/fibonacci.py index 4abc60d4f..7ec5993ef 100644 --- a/dynamic_programming/fibonacci.py +++ b/dynamic_programming/fibonacci.py @@ -18,8 +18,7 @@ class Fibonacci: >>> Fibonacci().get(5) [0, 1, 1, 2, 3] """ - difference = index - (len(self.sequence) - 2) - if difference >= 1: + if (difference := index - (len(self.sequence) - 2)) >= 1: for _ in range(difference): self.sequence.append(self.sequence[-1] + self.sequence[-2]) return self.sequence[:index] diff --git a/machine_learning/sequential_minimum_optimization.py b/machine_learning/sequential_minimum_optimization.py index df5b03790..66535e806 100644 --- a/machine_learning/sequential_minimum_optimization.py +++ b/machine_learning/sequential_minimum_optimization.py @@ -1,631 +1,630 @@ -""" - Implementation of sequential minimal optimization (SMO) for support vector machines - (SVM). - - Sequential minimal optimization (SMO) is an algorithm for solving the quadratic - programming (QP) problem that arises during the training of support vector - machines. - It was invented by John Platt in 1998. - -Input: - 0: type: numpy.ndarray. - 1: first column of ndarray must be tags of samples, must be 1 or -1. - 2: rows of ndarray represent samples. - -Usage: - Command: - python3 sequential_minimum_optimization.py - Code: - from sequential_minimum_optimization import SmoSVM, Kernel - - kernel = Kernel(kernel='poly', degree=3., coef0=1., gamma=0.5) - init_alphas = np.zeros(train.shape[0]) - SVM = SmoSVM(train=train, alpha_list=init_alphas, kernel_func=kernel, cost=0.4, - b=0.0, tolerance=0.001) - SVM.fit() - predict = SVM.predict(test_samples) - -Reference: - https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/smo-book.pdf - https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/tr-98-14.pdf -""" - - -import os -import sys -import urllib.request - -import numpy as np -import pandas as pd -from matplotlib import pyplot as plt -from sklearn.datasets import make_blobs, make_circles -from sklearn.preprocessing import StandardScaler - -CANCER_DATASET_URL = ( - "https://archive.ics.uci.edu/ml/machine-learning-databases/" - "breast-cancer-wisconsin/wdbc.data" -) - - -class SmoSVM: - def __init__( - self, - train, - kernel_func, - alpha_list=None, - cost=0.4, - b=0.0, - tolerance=0.001, - auto_norm=True, - ): - self._init = True - self._auto_norm = auto_norm - self._c = np.float64(cost) - self._b = np.float64(b) - self._tol = np.float64(tolerance) if tolerance > 0.0001 else np.float64(0.001) - - self.tags = train[:, 0] - self.samples = self._norm(train[:, 1:]) if self._auto_norm else train[:, 1:] - self.alphas = alpha_list if alpha_list is not None else np.zeros(train.shape[0]) - self.Kernel = kernel_func - - self._eps = 0.001 - self._all_samples = list(range(self.length)) - self._K_matrix = self._calculate_k_matrix() - self._error = np.zeros(self.length) - self._unbound = [] - - self.choose_alpha = self._choose_alphas() - - # Calculate alphas using SMO algorithm - def fit(self): - k = self._k - state = None - while True: - - # 1: Find alpha1, alpha2 - try: - i1, i2 = self.choose_alpha.send(state) - state = None - except StopIteration: - print("Optimization done!\nEvery sample satisfy the KKT condition!") - break - - # 2: calculate new alpha2 and new alpha1 - y1, y2 = self.tags[i1], self.tags[i2] - a1, a2 = self.alphas[i1].copy(), self.alphas[i2].copy() - e1, e2 = self._e(i1), self._e(i2) - args = (i1, i2, a1, a2, e1, e2, y1, y2) - a1_new, a2_new = self._get_new_alpha(*args) - if not a1_new and not a2_new: - state = False - continue - self.alphas[i1], self.alphas[i2] = a1_new, a2_new - - # 3: update threshold(b) - b1_new = np.float64( - -e1 - - y1 * k(i1, i1) * (a1_new - a1) - - y2 * k(i2, i1) * (a2_new - a2) - + self._b - ) - b2_new = np.float64( - -e2 - - y2 * k(i2, i2) * (a2_new - a2) - - y1 * k(i1, i2) * (a1_new - a1) - + self._b - ) - if 0.0 < a1_new < self._c: - b = b1_new - if 0.0 < a2_new < self._c: - b = b2_new - if not (np.float64(0) < a2_new < self._c) and not ( - np.float64(0) < a1_new < self._c - ): - b = (b1_new + b2_new) / 2.0 - b_old = self._b - self._b = b - - # 4: update error value,here we only calculate those non-bound samples' - # error - self._unbound = [i for i in self._all_samples if self._is_unbound(i)] - for s in self.unbound: - if s == i1 or s == i2: - continue - self._error[s] += ( - y1 * (a1_new - a1) * k(i1, s) - + y2 * (a2_new - a2) * k(i2, s) - + (self._b - b_old) - ) - - # if i1 or i2 is non-bound,update there error value to zero - if self._is_unbound(i1): - self._error[i1] = 0 - if self._is_unbound(i2): - self._error[i2] = 0 - - # Predict test samples - def predict(self, test_samples, classify=True): - - if test_samples.shape[1] > self.samples.shape[1]: - raise ValueError( - "Test samples' feature length does not equal to that of train samples" - ) - - if self._auto_norm: - test_samples = self._norm(test_samples) - - results = [] - for test_sample in test_samples: - result = self._predict(test_sample) - if classify: - results.append(1 if result > 0 else -1) - else: - results.append(result) - return np.array(results) - - # Check if alpha violate KKT condition - def _check_obey_kkt(self, index): - alphas = self.alphas - tol = self._tol - r = self._e(index) * self.tags[index] - c = self._c - - return (r < -tol and alphas[index] < c) or (r > tol and alphas[index] > 0.0) - - # Get value calculated from kernel function - def _k(self, i1, i2): - # for test samples,use Kernel function - if isinstance(i2, np.ndarray): - return self.Kernel(self.samples[i1], i2) - # for train samples,Kernel values have been saved in matrix - else: - return self._K_matrix[i1, i2] - - # Get sample's error - def _e(self, index): - """ - Two cases: - 1:Sample[index] is non-bound,Fetch error from list: _error - 2:sample[index] is bound,Use predicted value deduct true value: g(xi) - yi - - """ - # get from error data - if self._is_unbound(index): - return self._error[index] - # get by g(xi) - yi - else: - gx = np.dot(self.alphas * self.tags, self._K_matrix[:, index]) + self._b - yi = self.tags[index] - return gx - yi - - # Calculate Kernel matrix of all possible i1,i2 ,saving time - def _calculate_k_matrix(self): - k_matrix = np.zeros([self.length, self.length]) - for i in self._all_samples: - for j in self._all_samples: - k_matrix[i, j] = np.float64( - self.Kernel(self.samples[i, :], self.samples[j, :]) - ) - return k_matrix - - # Predict test sample's tag - def _predict(self, sample): - k = self._k - predicted_value = ( - np.sum( - [ - self.alphas[i1] * self.tags[i1] * k(i1, sample) - for i1 in self._all_samples - ] - ) - + self._b - ) - return predicted_value - - # Choose alpha1 and alpha2 - def _choose_alphas(self): - locis = yield from self._choose_a1() - if not locis: - return - return locis - - def _choose_a1(self): - """ - Choose first alpha ;steps: - 1:First loop over all sample - 2:Second loop over all non-bound samples till all non-bound samples does not - voilate kkt condition. - 3:Repeat this two process endlessly,till all samples does not voilate kkt - condition samples after first loop. - """ - while True: - all_not_obey = True - # all sample - print("scanning all sample!") - for i1 in [i for i in self._all_samples if self._check_obey_kkt(i)]: - all_not_obey = False - yield from self._choose_a2(i1) - - # non-bound sample - print("scanning non-bound sample!") - while True: - not_obey = True - for i1 in [ - i - for i in self._all_samples - if self._check_obey_kkt(i) and self._is_unbound(i) - ]: - not_obey = False - yield from self._choose_a2(i1) - if not_obey: - print("all non-bound samples fit the KKT condition!") - break - if all_not_obey: - print("all samples fit the KKT condition! Optimization done!") - break - return False - - def _choose_a2(self, i1): - """ - Choose the second alpha by using heuristic algorithm ;steps: - 1: Choose alpha2 which gets the maximum step size (|E1 - E2|). - 2: Start in a random point,loop over all non-bound samples till alpha1 and - alpha2 are optimized. - 3: Start in a random point,loop over all samples till alpha1 and alpha2 are - optimized. - """ - self._unbound = [i for i in self._all_samples if self._is_unbound(i)] - - if len(self.unbound) > 0: - tmp_error = self._error.copy().tolist() - tmp_error_dict = { - index: value - for index, value in enumerate(tmp_error) - if self._is_unbound(index) - } - if self._e(i1) >= 0: - i2 = min(tmp_error_dict, key=lambda index: tmp_error_dict[index]) - else: - i2 = max(tmp_error_dict, key=lambda index: tmp_error_dict[index]) - cmd = yield i1, i2 - if cmd is None: - return - - for i2 in np.roll(self.unbound, np.random.choice(self.length)): - cmd = yield i1, i2 - if cmd is None: - return - - for i2 in np.roll(self._all_samples, np.random.choice(self.length)): - cmd = yield i1, i2 - if cmd is None: - return - - # Get the new alpha2 and new alpha1 - def _get_new_alpha(self, i1, i2, a1, a2, e1, e2, y1, y2): - k = self._k - if i1 == i2: - return None, None - - # calculate L and H which bound the new alpha2 - s = y1 * y2 - if s == -1: - l, h = max(0.0, a2 - a1), min(self._c, self._c + a2 - a1) - else: - l, h = max(0.0, a2 + a1 - self._c), min(self._c, a2 + a1) - if l == h: # noqa: E741 - return None, None - - # calculate eta - k11 = k(i1, i1) - k22 = k(i2, i2) - k12 = k(i1, i2) - eta = k11 + k22 - 2.0 * k12 - - # select the new alpha2 which could get the minimal objectives - if eta > 0.0: - a2_new_unc = a2 + (y2 * (e1 - e2)) / eta - # a2_new has a boundary - if a2_new_unc >= h: - a2_new = h - elif a2_new_unc <= l: - a2_new = l - else: - a2_new = a2_new_unc - else: - b = self._b - l1 = a1 + s * (a2 - l) - h1 = a1 + s * (a2 - h) - - # way 1 - f1 = y1 * (e1 + b) - a1 * k(i1, i1) - s * a2 * k(i1, i2) - f2 = y2 * (e2 + b) - a2 * k(i2, i2) - s * a1 * k(i1, i2) - ol = ( - l1 * f1 - + l * f2 - + 1 / 2 * l1**2 * k(i1, i1) - + 1 / 2 * l**2 * k(i2, i2) - + s * l * l1 * k(i1, i2) - ) - oh = ( - h1 * f1 - + h * f2 - + 1 / 2 * h1**2 * k(i1, i1) - + 1 / 2 * h**2 * k(i2, i2) - + s * h * h1 * k(i1, i2) - ) - """ - # way 2 - Use objective function check which alpha2 new could get the minimal - objectives - """ - if ol < (oh - self._eps): - a2_new = l - elif ol > oh + self._eps: - a2_new = h - else: - a2_new = a2 - - # a1_new has a boundary too - a1_new = a1 + s * (a2 - a2_new) - if a1_new < 0: - a2_new += s * a1_new - a1_new = 0 - if a1_new > self._c: - a2_new += s * (a1_new - self._c) - a1_new = self._c - - return a1_new, a2_new - - # Normalise data using min_max way - def _norm(self, data): - if self._init: - self._min = np.min(data, axis=0) - self._max = np.max(data, axis=0) - self._init = False - return (data - self._min) / (self._max - self._min) - else: - return (data - self._min) / (self._max - self._min) - - def _is_unbound(self, index): - if 0.0 < self.alphas[index] < self._c: - return True - else: - return False - - def _is_support(self, index): - if self.alphas[index] > 0: - return True - else: - return False - - @property - def unbound(self): - return self._unbound - - @property - def support(self): - return [i for i in range(self.length) if self._is_support(i)] - - @property - def length(self): - return self.samples.shape[0] - - -class Kernel: - def __init__(self, kernel, degree=1.0, coef0=0.0, gamma=1.0): - self.degree = np.float64(degree) - self.coef0 = np.float64(coef0) - self.gamma = np.float64(gamma) - self._kernel_name = kernel - self._kernel = self._get_kernel(kernel_name=kernel) - self._check() - - def _polynomial(self, v1, v2): - return (self.gamma * np.inner(v1, v2) + self.coef0) ** self.degree - - def _linear(self, v1, v2): - return np.inner(v1, v2) + self.coef0 - - def _rbf(self, v1, v2): - return np.exp(-1 * (self.gamma * np.linalg.norm(v1 - v2) ** 2)) - - def _check(self): - if self._kernel == self._rbf: - if self.gamma < 0: - raise ValueError("gamma value must greater than 0") - - def _get_kernel(self, kernel_name): - maps = {"linear": self._linear, "poly": self._polynomial, "rbf": self._rbf} - return maps[kernel_name] - - def __call__(self, v1, v2): - return self._kernel(v1, v2) - - def __repr__(self): - return self._kernel_name - - -def count_time(func): - def call_func(*args, **kwargs): - import time - - start_time = time.time() - func(*args, **kwargs) - end_time = time.time() - print(f"smo algorithm cost {end_time - start_time} seconds") - - return call_func - - -@count_time -def test_cancel_data(): - print("Hello!\nStart test svm by smo algorithm!") - # 0: download dataset and load into pandas' dataframe - if not os.path.exists(r"cancel_data.csv"): - request = urllib.request.Request( - CANCER_DATASET_URL, - headers={"User-Agent": "Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)"}, - ) - response = urllib.request.urlopen(request) - content = response.read().decode("utf-8") - with open(r"cancel_data.csv", "w") as f: - f.write(content) - - data = pd.read_csv(r"cancel_data.csv", header=None) - - # 1: pre-processing data - del data[data.columns.tolist()[0]] - data = data.dropna(axis=0) - data = data.replace({"M": np.float64(1), "B": np.float64(-1)}) - samples = np.array(data)[:, :] - - # 2: dividing data into train_data data and test_data data - train_data, test_data = samples[:328, :], samples[328:, :] - test_tags, test_samples = test_data[:, 0], test_data[:, 1:] - - # 3: choose kernel function,and set initial alphas to zero(optional) - mykernel = Kernel(kernel="rbf", degree=5, coef0=1, gamma=0.5) - al = np.zeros(train_data.shape[0]) - - # 4: calculating best alphas using SMO algorithm and predict test_data samples - mysvm = SmoSVM( - train=train_data, - kernel_func=mykernel, - alpha_list=al, - cost=0.4, - b=0.0, - tolerance=0.001, - ) - mysvm.fit() - predict = mysvm.predict(test_samples) - - # 5: check accuracy - score = 0 - test_num = test_tags.shape[0] - for i in range(test_tags.shape[0]): - if test_tags[i] == predict[i]: - score += 1 - print(f"\nall: {test_num}\nright: {score}\nfalse: {test_num - score}") - print(f"Rough Accuracy: {score / test_tags.shape[0]}") - - -def test_demonstration(): - # change stdout - print("\nStart plot,please wait!!!") - sys.stdout = open(os.devnull, "w") - - ax1 = plt.subplot2grid((2, 2), (0, 0)) - ax2 = plt.subplot2grid((2, 2), (0, 1)) - ax3 = plt.subplot2grid((2, 2), (1, 0)) - ax4 = plt.subplot2grid((2, 2), (1, 1)) - ax1.set_title("linear svm,cost:0.1") - test_linear_kernel(ax1, cost=0.1) - ax2.set_title("linear svm,cost:500") - test_linear_kernel(ax2, cost=500) - ax3.set_title("rbf kernel svm,cost:0.1") - test_rbf_kernel(ax3, cost=0.1) - ax4.set_title("rbf kernel svm,cost:500") - test_rbf_kernel(ax4, cost=500) - - sys.stdout = sys.__stdout__ - print("Plot done!!!") - - -def test_linear_kernel(ax, cost): - train_x, train_y = make_blobs( - n_samples=500, centers=2, n_features=2, random_state=1 - ) - train_y[train_y == 0] = -1 - scaler = StandardScaler() - train_x_scaled = scaler.fit_transform(train_x, train_y) - train_data = np.hstack((train_y.reshape(500, 1), train_x_scaled)) - mykernel = Kernel(kernel="linear", degree=5, coef0=1, gamma=0.5) - mysvm = SmoSVM( - train=train_data, - kernel_func=mykernel, - cost=cost, - tolerance=0.001, - auto_norm=False, - ) - mysvm.fit() - plot_partition_boundary(mysvm, train_data, ax=ax) - - -def test_rbf_kernel(ax, cost): - train_x, train_y = make_circles( - n_samples=500, noise=0.1, factor=0.1, random_state=1 - ) - train_y[train_y == 0] = -1 - scaler = StandardScaler() - train_x_scaled = scaler.fit_transform(train_x, train_y) - train_data = np.hstack((train_y.reshape(500, 1), train_x_scaled)) - mykernel = Kernel(kernel="rbf", degree=5, coef0=1, gamma=0.5) - mysvm = SmoSVM( - train=train_data, - kernel_func=mykernel, - cost=cost, - tolerance=0.001, - auto_norm=False, - ) - mysvm.fit() - plot_partition_boundary(mysvm, train_data, ax=ax) - - -def plot_partition_boundary( - model, train_data, ax, resolution=100, colors=("b", "k", "r") -): - """ - We can not get the optimum w of our kernel svm model which is different from linear - svm. For this reason, we generate randomly distributed points with high desity and - prediced values of these points are calculated by using our tained model. Then we - could use this prediced values to draw contour map. - And this contour map can represent svm's partition boundary. - """ - train_data_x = train_data[:, 1] - train_data_y = train_data[:, 2] - train_data_tags = train_data[:, 0] - xrange = np.linspace(train_data_x.min(), train_data_x.max(), resolution) - yrange = np.linspace(train_data_y.min(), train_data_y.max(), resolution) - test_samples = np.array([(x, y) for x in xrange for y in yrange]).reshape( - resolution * resolution, 2 - ) - - test_tags = model.predict(test_samples, classify=False) - grid = test_tags.reshape((len(xrange), len(yrange))) - - # Plot contour map which represents the partition boundary - ax.contour( - xrange, - yrange, - np.mat(grid).T, - levels=(-1, 0, 1), - linestyles=("--", "-", "--"), - linewidths=(1, 1, 1), - colors=colors, - ) - # Plot all train samples - ax.scatter( - train_data_x, - train_data_y, - c=train_data_tags, - cmap=plt.cm.Dark2, - lw=0, - alpha=0.5, - ) - - # Plot support vectors - support = model.support - ax.scatter( - train_data_x[support], - train_data_y[support], - c=train_data_tags[support], - cmap=plt.cm.Dark2, - ) - - -if __name__ == "__main__": - test_cancel_data() - test_demonstration() - plt.show() +""" + Implementation of sequential minimal optimization (SMO) for support vector machines + (SVM). + + Sequential minimal optimization (SMO) is an algorithm for solving the quadratic + programming (QP) problem that arises during the training of support vector + machines. + It was invented by John Platt in 1998. + +Input: + 0: type: numpy.ndarray. + 1: first column of ndarray must be tags of samples, must be 1 or -1. + 2: rows of ndarray represent samples. + +Usage: + Command: + python3 sequential_minimum_optimization.py + Code: + from sequential_minimum_optimization import SmoSVM, Kernel + + kernel = Kernel(kernel='poly', degree=3., coef0=1., gamma=0.5) + init_alphas = np.zeros(train.shape[0]) + SVM = SmoSVM(train=train, alpha_list=init_alphas, kernel_func=kernel, cost=0.4, + b=0.0, tolerance=0.001) + SVM.fit() + predict = SVM.predict(test_samples) + +Reference: + https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/smo-book.pdf + https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/tr-98-14.pdf +""" + + +import os +import sys +import urllib.request + +import numpy as np +import pandas as pd +from matplotlib import pyplot as plt +from sklearn.datasets import make_blobs, make_circles +from sklearn.preprocessing import StandardScaler + +CANCER_DATASET_URL = ( + "https://archive.ics.uci.edu/ml/machine-learning-databases/" + "breast-cancer-wisconsin/wdbc.data" +) + + +class SmoSVM: + def __init__( + self, + train, + kernel_func, + alpha_list=None, + cost=0.4, + b=0.0, + tolerance=0.001, + auto_norm=True, + ): + self._init = True + self._auto_norm = auto_norm + self._c = np.float64(cost) + self._b = np.float64(b) + self._tol = np.float64(tolerance) if tolerance > 0.0001 else np.float64(0.001) + + self.tags = train[:, 0] + self.samples = self._norm(train[:, 1:]) if self._auto_norm else train[:, 1:] + self.alphas = alpha_list if alpha_list is not None else np.zeros(train.shape[0]) + self.Kernel = kernel_func + + self._eps = 0.001 + self._all_samples = list(range(self.length)) + self._K_matrix = self._calculate_k_matrix() + self._error = np.zeros(self.length) + self._unbound = [] + + self.choose_alpha = self._choose_alphas() + + # Calculate alphas using SMO algorithm + def fit(self): + k = self._k + state = None + while True: + + # 1: Find alpha1, alpha2 + try: + i1, i2 = self.choose_alpha.send(state) + state = None + except StopIteration: + print("Optimization done!\nEvery sample satisfy the KKT condition!") + break + + # 2: calculate new alpha2 and new alpha1 + y1, y2 = self.tags[i1], self.tags[i2] + a1, a2 = self.alphas[i1].copy(), self.alphas[i2].copy() + e1, e2 = self._e(i1), self._e(i2) + args = (i1, i2, a1, a2, e1, e2, y1, y2) + a1_new, a2_new = self._get_new_alpha(*args) + if not a1_new and not a2_new: + state = False + continue + self.alphas[i1], self.alphas[i2] = a1_new, a2_new + + # 3: update threshold(b) + b1_new = np.float64( + -e1 + - y1 * k(i1, i1) * (a1_new - a1) + - y2 * k(i2, i1) * (a2_new - a2) + + self._b + ) + b2_new = np.float64( + -e2 + - y2 * k(i2, i2) * (a2_new - a2) + - y1 * k(i1, i2) * (a1_new - a1) + + self._b + ) + if 0.0 < a1_new < self._c: + b = b1_new + if 0.0 < a2_new < self._c: + b = b2_new + if not (np.float64(0) < a2_new < self._c) and not ( + np.float64(0) < a1_new < self._c + ): + b = (b1_new + b2_new) / 2.0 + b_old = self._b + self._b = b + + # 4: update error value,here we only calculate those non-bound samples' + # error + self._unbound = [i for i in self._all_samples if self._is_unbound(i)] + for s in self.unbound: + if s == i1 or s == i2: + continue + self._error[s] += ( + y1 * (a1_new - a1) * k(i1, s) + + y2 * (a2_new - a2) * k(i2, s) + + (self._b - b_old) + ) + + # if i1 or i2 is non-bound,update there error value to zero + if self._is_unbound(i1): + self._error[i1] = 0 + if self._is_unbound(i2): + self._error[i2] = 0 + + # Predict test samples + def predict(self, test_samples, classify=True): + + if test_samples.shape[1] > self.samples.shape[1]: + raise ValueError( + "Test samples' feature length does not equal to that of train samples" + ) + + if self._auto_norm: + test_samples = self._norm(test_samples) + + results = [] + for test_sample in test_samples: + result = self._predict(test_sample) + if classify: + results.append(1 if result > 0 else -1) + else: + results.append(result) + return np.array(results) + + # Check if alpha violate KKT condition + def _check_obey_kkt(self, index): + alphas = self.alphas + tol = self._tol + r = self._e(index) * self.tags[index] + c = self._c + + return (r < -tol and alphas[index] < c) or (r > tol and alphas[index] > 0.0) + + # Get value calculated from kernel function + def _k(self, i1, i2): + # for test samples,use Kernel function + if isinstance(i2, np.ndarray): + return self.Kernel(self.samples[i1], i2) + # for train samples,Kernel values have been saved in matrix + else: + return self._K_matrix[i1, i2] + + # Get sample's error + def _e(self, index): + """ + Two cases: + 1:Sample[index] is non-bound,Fetch error from list: _error + 2:sample[index] is bound,Use predicted value deduct true value: g(xi) - yi + + """ + # get from error data + if self._is_unbound(index): + return self._error[index] + # get by g(xi) - yi + else: + gx = np.dot(self.alphas * self.tags, self._K_matrix[:, index]) + self._b + yi = self.tags[index] + return gx - yi + + # Calculate Kernel matrix of all possible i1,i2 ,saving time + def _calculate_k_matrix(self): + k_matrix = np.zeros([self.length, self.length]) + for i in self._all_samples: + for j in self._all_samples: + k_matrix[i, j] = np.float64( + self.Kernel(self.samples[i, :], self.samples[j, :]) + ) + return k_matrix + + # Predict test sample's tag + def _predict(self, sample): + k = self._k + predicted_value = ( + np.sum( + [ + self.alphas[i1] * self.tags[i1] * k(i1, sample) + for i1 in self._all_samples + ] + ) + + self._b + ) + return predicted_value + + # Choose alpha1 and alpha2 + def _choose_alphas(self): + locis = yield from self._choose_a1() + if not locis: + return + return locis + + def _choose_a1(self): + """ + Choose first alpha ;steps: + 1:First loop over all sample + 2:Second loop over all non-bound samples till all non-bound samples does not + voilate kkt condition. + 3:Repeat this two process endlessly,till all samples does not voilate kkt + condition samples after first loop. + """ + while True: + all_not_obey = True + # all sample + print("scanning all sample!") + for i1 in [i for i in self._all_samples if self._check_obey_kkt(i)]: + all_not_obey = False + yield from self._choose_a2(i1) + + # non-bound sample + print("scanning non-bound sample!") + while True: + not_obey = True + for i1 in [ + i + for i in self._all_samples + if self._check_obey_kkt(i) and self._is_unbound(i) + ]: + not_obey = False + yield from self._choose_a2(i1) + if not_obey: + print("all non-bound samples fit the KKT condition!") + break + if all_not_obey: + print("all samples fit the KKT condition! Optimization done!") + break + return False + + def _choose_a2(self, i1): + """ + Choose the second alpha by using heuristic algorithm ;steps: + 1: Choose alpha2 which gets the maximum step size (|E1 - E2|). + 2: Start in a random point,loop over all non-bound samples till alpha1 and + alpha2 are optimized. + 3: Start in a random point,loop over all samples till alpha1 and alpha2 are + optimized. + """ + self._unbound = [i for i in self._all_samples if self._is_unbound(i)] + + if len(self.unbound) > 0: + tmp_error = self._error.copy().tolist() + tmp_error_dict = { + index: value + for index, value in enumerate(tmp_error) + if self._is_unbound(index) + } + if self._e(i1) >= 0: + i2 = min(tmp_error_dict, key=lambda index: tmp_error_dict[index]) + else: + i2 = max(tmp_error_dict, key=lambda index: tmp_error_dict[index]) + cmd = yield i1, i2 + if cmd is None: + return + + for i2 in np.roll(self.unbound, np.random.choice(self.length)): + cmd = yield i1, i2 + if cmd is None: + return + + for i2 in np.roll(self._all_samples, np.random.choice(self.length)): + cmd = yield i1, i2 + if cmd is None: + return + + # Get the new alpha2 and new alpha1 + def _get_new_alpha(self, i1, i2, a1, a2, e1, e2, y1, y2): + k = self._k + if i1 == i2: + return None, None + + # calculate L and H which bound the new alpha2 + s = y1 * y2 + if s == -1: + l, h = max(0.0, a2 - a1), min(self._c, self._c + a2 - a1) + else: + l, h = max(0.0, a2 + a1 - self._c), min(self._c, a2 + a1) + if l == h: # noqa: E741 + return None, None + + # calculate eta + k11 = k(i1, i1) + k22 = k(i2, i2) + k12 = k(i1, i2) + + # select the new alpha2 which could get the minimal objectives + if (eta := k11 + k22 - 2.0 * k12) > 0.0: + a2_new_unc = a2 + (y2 * (e1 - e2)) / eta + # a2_new has a boundary + if a2_new_unc >= h: + a2_new = h + elif a2_new_unc <= l: + a2_new = l + else: + a2_new = a2_new_unc + else: + b = self._b + l1 = a1 + s * (a2 - l) + h1 = a1 + s * (a2 - h) + + # way 1 + f1 = y1 * (e1 + b) - a1 * k(i1, i1) - s * a2 * k(i1, i2) + f2 = y2 * (e2 + b) - a2 * k(i2, i2) - s * a1 * k(i1, i2) + ol = ( + l1 * f1 + + l * f2 + + 1 / 2 * l1**2 * k(i1, i1) + + 1 / 2 * l**2 * k(i2, i2) + + s * l * l1 * k(i1, i2) + ) + oh = ( + h1 * f1 + + h * f2 + + 1 / 2 * h1**2 * k(i1, i1) + + 1 / 2 * h**2 * k(i2, i2) + + s * h * h1 * k(i1, i2) + ) + """ + # way 2 + Use objective function check which alpha2 new could get the minimal + objectives + """ + if ol < (oh - self._eps): + a2_new = l + elif ol > oh + self._eps: + a2_new = h + else: + a2_new = a2 + + # a1_new has a boundary too + a1_new = a1 + s * (a2 - a2_new) + if a1_new < 0: + a2_new += s * a1_new + a1_new = 0 + if a1_new > self._c: + a2_new += s * (a1_new - self._c) + a1_new = self._c + + return a1_new, a2_new + + # Normalise data using min_max way + def _norm(self, data): + if self._init: + self._min = np.min(data, axis=0) + self._max = np.max(data, axis=0) + self._init = False + return (data - self._min) / (self._max - self._min) + else: + return (data - self._min) / (self._max - self._min) + + def _is_unbound(self, index): + if 0.0 < self.alphas[index] < self._c: + return True + else: + return False + + def _is_support(self, index): + if self.alphas[index] > 0: + return True + else: + return False + + @property + def unbound(self): + return self._unbound + + @property + def support(self): + return [i for i in range(self.length) if self._is_support(i)] + + @property + def length(self): + return self.samples.shape[0] + + +class Kernel: + def __init__(self, kernel, degree=1.0, coef0=0.0, gamma=1.0): + self.degree = np.float64(degree) + self.coef0 = np.float64(coef0) + self.gamma = np.float64(gamma) + self._kernel_name = kernel + self._kernel = self._get_kernel(kernel_name=kernel) + self._check() + + def _polynomial(self, v1, v2): + return (self.gamma * np.inner(v1, v2) + self.coef0) ** self.degree + + def _linear(self, v1, v2): + return np.inner(v1, v2) + self.coef0 + + def _rbf(self, v1, v2): + return np.exp(-1 * (self.gamma * np.linalg.norm(v1 - v2) ** 2)) + + def _check(self): + if self._kernel == self._rbf: + if self.gamma < 0: + raise ValueError("gamma value must greater than 0") + + def _get_kernel(self, kernel_name): + maps = {"linear": self._linear, "poly": self._polynomial, "rbf": self._rbf} + return maps[kernel_name] + + def __call__(self, v1, v2): + return self._kernel(v1, v2) + + def __repr__(self): + return self._kernel_name + + +def count_time(func): + def call_func(*args, **kwargs): + import time + + start_time = time.time() + func(*args, **kwargs) + end_time = time.time() + print(f"smo algorithm cost {end_time - start_time} seconds") + + return call_func + + +@count_time +def test_cancel_data(): + print("Hello!\nStart test svm by smo algorithm!") + # 0: download dataset and load into pandas' dataframe + if not os.path.exists(r"cancel_data.csv"): + request = urllib.request.Request( + CANCER_DATASET_URL, + headers={"User-Agent": "Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)"}, + ) + response = urllib.request.urlopen(request) + content = response.read().decode("utf-8") + with open(r"cancel_data.csv", "w") as f: + f.write(content) + + data = pd.read_csv(r"cancel_data.csv", header=None) + + # 1: pre-processing data + del data[data.columns.tolist()[0]] + data = data.dropna(axis=0) + data = data.replace({"M": np.float64(1), "B": np.float64(-1)}) + samples = np.array(data)[:, :] + + # 2: dividing data into train_data data and test_data data + train_data, test_data = samples[:328, :], samples[328:, :] + test_tags, test_samples = test_data[:, 0], test_data[:, 1:] + + # 3: choose kernel function,and set initial alphas to zero(optional) + mykernel = Kernel(kernel="rbf", degree=5, coef0=1, gamma=0.5) + al = np.zeros(train_data.shape[0]) + + # 4: calculating best alphas using SMO algorithm and predict test_data samples + mysvm = SmoSVM( + train=train_data, + kernel_func=mykernel, + alpha_list=al, + cost=0.4, + b=0.0, + tolerance=0.001, + ) + mysvm.fit() + predict = mysvm.predict(test_samples) + + # 5: check accuracy + score = 0 + test_num = test_tags.shape[0] + for i in range(test_tags.shape[0]): + if test_tags[i] == predict[i]: + score += 1 + print(f"\nall: {test_num}\nright: {score}\nfalse: {test_num - score}") + print(f"Rough Accuracy: {score / test_tags.shape[0]}") + + +def test_demonstration(): + # change stdout + print("\nStart plot,please wait!!!") + sys.stdout = open(os.devnull, "w") + + ax1 = plt.subplot2grid((2, 2), (0, 0)) + ax2 = plt.subplot2grid((2, 2), (0, 1)) + ax3 = plt.subplot2grid((2, 2), (1, 0)) + ax4 = plt.subplot2grid((2, 2), (1, 1)) + ax1.set_title("linear svm,cost:0.1") + test_linear_kernel(ax1, cost=0.1) + ax2.set_title("linear svm,cost:500") + test_linear_kernel(ax2, cost=500) + ax3.set_title("rbf kernel svm,cost:0.1") + test_rbf_kernel(ax3, cost=0.1) + ax4.set_title("rbf kernel svm,cost:500") + test_rbf_kernel(ax4, cost=500) + + sys.stdout = sys.__stdout__ + print("Plot done!!!") + + +def test_linear_kernel(ax, cost): + train_x, train_y = make_blobs( + n_samples=500, centers=2, n_features=2, random_state=1 + ) + train_y[train_y == 0] = -1 + scaler = StandardScaler() + train_x_scaled = scaler.fit_transform(train_x, train_y) + train_data = np.hstack((train_y.reshape(500, 1), train_x_scaled)) + mykernel = Kernel(kernel="linear", degree=5, coef0=1, gamma=0.5) + mysvm = SmoSVM( + train=train_data, + kernel_func=mykernel, + cost=cost, + tolerance=0.001, + auto_norm=False, + ) + mysvm.fit() + plot_partition_boundary(mysvm, train_data, ax=ax) + + +def test_rbf_kernel(ax, cost): + train_x, train_y = make_circles( + n_samples=500, noise=0.1, factor=0.1, random_state=1 + ) + train_y[train_y == 0] = -1 + scaler = StandardScaler() + train_x_scaled = scaler.fit_transform(train_x, train_y) + train_data = np.hstack((train_y.reshape(500, 1), train_x_scaled)) + mykernel = Kernel(kernel="rbf", degree=5, coef0=1, gamma=0.5) + mysvm = SmoSVM( + train=train_data, + kernel_func=mykernel, + cost=cost, + tolerance=0.001, + auto_norm=False, + ) + mysvm.fit() + plot_partition_boundary(mysvm, train_data, ax=ax) + + +def plot_partition_boundary( + model, train_data, ax, resolution=100, colors=("b", "k", "r") +): + """ + We can not get the optimum w of our kernel svm model which is different from linear + svm. For this reason, we generate randomly distributed points with high desity and + prediced values of these points are calculated by using our tained model. Then we + could use this prediced values to draw contour map. + And this contour map can represent svm's partition boundary. + """ + train_data_x = train_data[:, 1] + train_data_y = train_data[:, 2] + train_data_tags = train_data[:, 0] + xrange = np.linspace(train_data_x.min(), train_data_x.max(), resolution) + yrange = np.linspace(train_data_y.min(), train_data_y.max(), resolution) + test_samples = np.array([(x, y) for x in xrange for y in yrange]).reshape( + resolution * resolution, 2 + ) + + test_tags = model.predict(test_samples, classify=False) + grid = test_tags.reshape((len(xrange), len(yrange))) + + # Plot contour map which represents the partition boundary + ax.contour( + xrange, + yrange, + np.mat(grid).T, + levels=(-1, 0, 1), + linestyles=("--", "-", "--"), + linewidths=(1, 1, 1), + colors=colors, + ) + # Plot all train samples + ax.scatter( + train_data_x, + train_data_y, + c=train_data_tags, + cmap=plt.cm.Dark2, + lw=0, + alpha=0.5, + ) + + # Plot support vectors + support = model.support + ax.scatter( + train_data_x[support], + train_data_y[support], + c=train_data_tags[support], + cmap=plt.cm.Dark2, + ) + + +if __name__ == "__main__": + test_cancel_data() + test_demonstration() + plt.show() diff --git a/strings/indian_phone_validator.py b/strings/indian_phone_validator.py index 7f3fda5db..07161a63a 100644 --- a/strings/indian_phone_validator.py +++ b/strings/indian_phone_validator.py @@ -20,8 +20,7 @@ def indian_phone_validator(phone: str) -> bool: True """ pat = re.compile(r"^(\+91[\-\s]?)?[0]?(91)?[789]\d{9}$") - match = re.search(pat, phone) - if match: + if match := re.search(pat, phone): return match.string == phone return False