From 19b713aecbaa3995d1b6fd9f766f7139d5a3def4 Mon Sep 17 00:00:00 2001 From: Ioane Margiani Date: Wed, 17 Jun 2020 23:12:48 +0400 Subject: [PATCH] Add lempel ziv compression (#2107) * Added lempel-ziv compression algorithm implementation * Added lempel-ziv decompression algorithm implementation * Reformatted lempel-ziv compress/decompress files using black * Added type hints and some other modifications (Doctests coming up) * Shortened several lines to comply with the standards --- compression/lempel_ziv.py | 125 +++++++++++++++++++++++++++ compression/lempel_ziv_decompress.py | 111 ++++++++++++++++++++++++ 2 files changed, 236 insertions(+) create mode 100644 compression/lempel_ziv.py create mode 100644 compression/lempel_ziv_decompress.py diff --git a/compression/lempel_ziv.py b/compression/lempel_ziv.py new file mode 100644 index 000000000..3ac8573c4 --- /dev/null +++ b/compression/lempel_ziv.py @@ -0,0 +1,125 @@ +""" + One of the several implementations of Lempel–Ziv–Welch compression algorithm + https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch +""" + +import math +import os +import sys + + +def read_file_binary(file_path: str) -> str: + """ + Reads given file as bytes and returns them as a long string + """ + result = "" + try: + with open(file_path, "rb") as binary_file: + data = binary_file.read() + for dat in data: + curr_byte = "{0:08b}".format(dat) + result += curr_byte + return result + except IOError: + print("File not accessible") + sys.exit() + + +def add_key_to_lexicon( + lexicon: dict, curr_string: str, index: int, last_match_id: int +) -> None: + """ + Adds new strings (curr_string + "0", curr_string + "1") to the lexicon + """ + lexicon.pop(curr_string) + lexicon[curr_string + "0"] = last_match_id + + if math.log2(index).is_integer(): + for curr_key in lexicon: + lexicon[curr_key] = "0" + lexicon[curr_key] + + lexicon[curr_string + "1"] = bin(index)[2:] + + +def compress_data(data_bits: str) -> str: + """ + Compresses given data_bits using Lempel–Ziv–Welch compression algorithm + and returns the result as a string + """ + lexicon = {"0": "0", "1": "1"} + result, curr_string = "", "" + index = len(lexicon) + + for i in range(len(data_bits)): + curr_string += data_bits[i] + if curr_string not in lexicon: + continue + + last_match_id = lexicon[curr_string] + result += last_match_id + add_key_to_lexicon(lexicon, curr_string, index, last_match_id) + index += 1 + curr_string = "" + + while curr_string != "" and curr_string not in lexicon: + curr_string += "0" + + if curr_string != "": + last_match_id = lexicon[curr_string] + result += last_match_id + + return result + + +def add_file_length(source_path: str, compressed: str) -> str: + """ + Adds given file's length in front (using Elias gamma coding) of the compressed + string + """ + file_length = os.path.getsize(source_path) + file_length_binary = bin(file_length)[2:] + length_length = len(file_length_binary) + + return "0" * (length_length - 1) + file_length_binary + compressed + + +def write_file_binary(file_path: str, to_write: str) -> None: + """ + Writes given to_write string (should only consist of 0's and 1's) as bytes in the + file + """ + byte_length = 8 + try: + with open(file_path, "wb") as opened_file: + result_byte_array = [ + to_write[i : i + byte_length] + for i in range(0, len(to_write), byte_length) + ] + + if len(result_byte_array[-1]) % byte_length == 0: + result_byte_array.append("10000000") + else: + result_byte_array[-1] += "1" + "0" * ( + byte_length - len(result_byte_array[-1]) - 1 + ) + + for elem in result_byte_array: + opened_file.write(int(elem, 2).to_bytes(1, byteorder="big")) + except IOError: + print("File not accessible") + sys.exit() + + +def compress(source_path, destination_path: str) -> None: + """ + Reads source file, compresses it and writes the compressed result in destination + file + """ + data_bits = read_file_binary(source_path) + compressed = compress_data(data_bits) + compressed = add_file_length(source_path, compressed) + write_file_binary(destination_path, compressed) + + +if __name__ == "__main__": + compress(sys.argv[1], sys.argv[2]) diff --git a/compression/lempel_ziv_decompress.py b/compression/lempel_ziv_decompress.py new file mode 100644 index 000000000..05c26740b --- /dev/null +++ b/compression/lempel_ziv_decompress.py @@ -0,0 +1,111 @@ +""" + One of the several implementations of Lempel–Ziv–Welch decompression algorithm + https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch +""" + +import math +import sys + + +def read_file_binary(file_path: str) -> str: + """ + Reads given file as bytes and returns them as a long string + """ + result = "" + try: + with open(file_path, "rb") as binary_file: + data = binary_file.read() + for dat in data: + curr_byte = "{0:08b}".format(dat) + result += curr_byte + return result + except IOError: + print("File not accessible") + sys.exit() + + +def decompress_data(data_bits: str) -> str: + """ + Decompresses given data_bits using Lempel–Ziv–Welch compression algorithm + and returns the result as a string + """ + lexicon = {"0": "0", "1": "1"} + result, curr_string = "", "" + index = len(lexicon) + + for i in range(len(data_bits)): + curr_string += data_bits[i] + if curr_string not in lexicon: + continue + + last_match_id = lexicon[curr_string] + result += last_match_id + lexicon[curr_string] = last_match_id + "0" + + if math.log2(index).is_integer(): + newLex = {} + for curr_key in list(lexicon): + newLex["0" + curr_key] = lexicon.pop(curr_key) + lexicon = newLex + + lexicon[bin(index)[2:]] = last_match_id + "1" + index += 1 + curr_string = "" + return result + + +def write_file_binary(file_path: str, to_write: str) -> None: + """ + Writes given to_write string (should only consist of 0's and 1's) as bytes in the + file + """ + byte_length = 8 + try: + with open(file_path, "wb") as opened_file: + result_byte_array = [ + to_write[i : i + byte_length] + for i in range(0, len(to_write), byte_length) + ] + + if len(result_byte_array[-1]) % byte_length == 0: + result_byte_array.append("10000000") + else: + result_byte_array[-1] += "1" + "0" * ( + byte_length - len(result_byte_array[-1]) - 1 + ) + + for elem in result_byte_array[:-1]: + opened_file.write(int(elem, 2).to_bytes(1, byteorder="big")) + except IOError: + print("File not accessible") + sys.exit() + + +def remove_prefix(data_bits: str) -> str: + """ + Removes size prefix, that compressed file should have + Returns the result + """ + counter = 0 + for letter in data_bits: + if letter == "1": + break + counter += 1 + + data_bits = data_bits[counter:] + data_bits = data_bits[counter + 1 :] + return data_bits + + +def compress(source_path: str, destination_path: str) -> None: + """ + Reads source file, decompresses it and writes the result in destination file + """ + data_bits = read_file_binary(source_path) + data_bits = remove_prefix(data_bits) + decompressed = decompress_data(data_bits) + write_file_binary(destination_path, decompressed) + + +if __name__ == "__main__": + compress(sys.argv[1], sys.argv[2])