import json import os import random import re import string import time from datetime import datetime from pathlib import Path from urllib.parse import unquote from xml.etree import ElementTree as ET import requests from bs4 import BeautifulSoup from requests.exceptions import ConnectionError letters = string.ascii_lowercase unusual = lambda prefix: not all( [c in letters for c in prefix.lower()]) # def uq(s): # return unquote(s).split("?")[0] uq = unquote def uqall(data): if isinstance(data, list): iterator = enumerate(data) elif isinstance(data, dict): for k in list(data.keys()): if "%" in k: data[uq(k)] = data.pop(k) iterator = data.items() elif isinstance(data, str): data = [data] iterator = enumerate(data) else: raise TypeError("can only traverse list or dict") for i, value in iterator: if isinstance(value, (list, dict)): uqall(value) elif isinstance(value, str): if "%" in value: data[i] = uq(value) return data def clear_whitespace(data): if isinstance(data, list): iterator = enumerate(data) elif isinstance(data, dict): iterator = data.items() elif isinstance(data, str): data = [data] iterator = enumerate(data) else: raise TypeError("can only traverse list or dict") for i, value in iterator: if isinstance(value, (list, dict)): clear_whitespace(value) elif isinstance(value, str): data[i] = re.sub(r"[\n\t\s]+", " ", value).strip() return data def randtime(a, b, k=0): if k: return [random.uniform(a, b) for _ in range(k)] else: return random.uniform(a, b) def remove_between(string, a, b): otag_pos = 0 ctag_pos = 0 for i in range(len(string)): if string[i : i + len(a)] == a: otag_pos = i elif string[i : i + len(b)] == b: ctag_pos = i + len(b) if otag_pos and ctag_pos: return remove_between(string[:otag_pos] + string[ctag_pos:], a, b) return string.strip() def remove_tag(string, tag="script"): otag = f"<{tag}" ctag = f"" otag_pos = 0 ctag_pos = 0 for i in range(len(string)): if string[i : i + len(otag)] == otag: otag_pos = i elif string[i : i + len(ctag)] == ctag: ctag_pos = i + len(ctag) if otag_pos and ctag_pos: return remove_tag(string[:otag_pos] + string[ctag_pos:], tag) return string def all_text(e): return clear_whitespace(" ".join(e.itertext())) def only_first_text(e): return clear_whitespace([next(e.itertext())])[0] def only_text(e): return " ".join(all_text(e)) def url2str(url: str) -> str: headers = { "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.115 Safari/537.36" } # bad_html = requests.get(url, headers=headers) bad_html = requests.get(url) tree = BeautifulSoup(bad_html.text, features="lxml") xml_str = str(tree) xml_str = remove_tag(xml_str, "head") xml_str = remove_tag(xml_str) # with open("test.html", "w") as f: # f.write(xml_str) return xml_str # aliases rb = remove_between cw = clear_whitespace ot = only_text at = all_text oft = only_first_text class WordParser: """WordParser needs additional methods to work with Queue: - self.neighbours = words found on the site - self.todict() = returning a dict with the parsed info""" def __init__(self, word, url_prefix): self.time = datetime.now().strftime("%Y%m%d-%H%M%S") self.word = word self.url = f"{url_prefix}{word}" self.xml_string = url2str(self.url) self.root = ET.fromstring(self.xml_string) class FileSet(set): def __init__(self, file): self.file = file super().__init__({line.strip() for line in open(self.file, "r")}) def load(self): self.update({line.strip() for line in open(self.file, "r")}) def save(self): if self: with open(self.file, "w") as f: f.write("\n".join([w for w in self if w])) def append(self): if self: self |= {line.strip() for line in open(self.file, "r")} self.save() class DictFile(dict): def __init__(self, file): self.file = file if os.path.isfile(self.file): with open(self.file, "r") as f: super().__init__(json.load(f)) else: super() def save(self): with open(self.file, "w") as f: json.dump(self, f, separators=(",", ":"), indent=2, sort_keys=False) class FullDictionary(dict): def __init__(self, dir_prefix, suffix): self.__dict__.update(locals()) full_dict = {} start = time.time() for db_file in Path(self.dir_prefix).glob(f"*{self.suffix}"): with open(db_file, "r") as f: full_dict |= json.load(f) self.readtime = time.time() - start super().__init__(full_dict) class Queue: def __init__( self, Parser, dir_prefix, suffix, time_base=1.01, time_exponent=10, prefix_length=3, ): self.__dict__.update(locals()) self.words = set() self.queue = FileSet(f"{dir_prefix}queue") self.snafus = FileSet(f"{dir_prefix}snafus") self.redo = FileSet(f"{dir_prefix}redo") def wait(self): a = self.time_base**self.time_exponent b = self.time_base ** (self.time_exponent * 3) time.sleep(randtime(a, b)) def loadDB(self): d = FullDictionary(self.dir_prefix, self.suffix) self.words |= set(d.keys()) print(d.readtime) def add_word(self): self.redo.load() self.queue -= self.words self.queue -= self.snafus self.queue |= self.redo len_queue = len(self.queue) # actual queue p = random.choice(list(self.queue)) try: start_parsing = time.time() w = self.Parser(p) # fetch new word word_dict = w.todict() print( f"{p} | " f"{len(self.words)} words collected, " f"{len_queue} words waiting in queue" # f", {start_db_stuff-start_parsing:.06f}s" # f"/{time.time() - start_db_stuff:.06f}s" ) start_db_stuff = time.time() prefix = p[: self.prefix_length].lower() if unusual(prefix): prefix = "_" * self.prefix_length dict_part = DictFile(f"{self.dir_prefix}{prefix}{self.suffix}") dict_part |= word_dict dict_part.save() del dict_part self.words |= set(word_dict.keys()) self.queue |= set(w.neighbours) self.queue -= {p} self.redo -= {p} self.redo.save() self.wait() except ( AssertionError, ET.ParseError, ): self.queue.save() print("snafu... ", p) self.redo -= {p} self.redo.save() self.snafus |= {p} self.snafus.append() self.wait() except ConnectionError: self.queue.save() self.time_exponent += 1 self.wait() if __name__ == "__main__": f = FileSet("en_merriam_webster/queue") # d = DictFile("en_merriam_webster/abc_mw.json") # d.save() print(d)