import json import os import random import re import string import time from datetime import datetime from pathlib import Path from urllib.parse import unquote from xml.etree import ElementTree as ET import requests from bs4 import BeautifulSoup from requests.exceptions import ConnectionError letters = string.ascii_lowercase # def uq(s): # return unquote(s).split("?")[0] uq = unquote def uqall(data): if isinstance(data, list): iterator = enumerate(data) elif isinstance(data, dict): for k in list(data.keys()): if "%" in k: data[uq(k)] = data.pop(k) iterator = data.items() elif isinstance(data, str): data = [data] iterator = enumerate(data) else: raise TypeError("can only traverse list or dict") for i, value in iterator: if isinstance(value, (list, dict)): uqall(value) elif isinstance(value, str): if "%" in value: data[i] = uq(value) return data def clear_whitespace(data): if isinstance(data, list): iterator = enumerate(data) elif isinstance(data, dict): iterator = data.items() elif isinstance(data, str): data = [data] iterator = enumerate(data) else: raise TypeError("can only traverse list or dict") for i, value in iterator: if isinstance(value, (list, dict)): clear_whitespace(value) elif isinstance(value, str): data[i] = re.sub(r"[\n\t\s]+", " ", value).strip() return data def randtime(a, b, k=0): if k: return [random.uniform(a, b) for _ in range(k)] else: return random.uniform(a, b) def remove_between(string, a, b): otag_pos = 0 ctag_pos = 0 for i in range(len(string)): if string[i : i + len(a)] == a: otag_pos = i elif string[i : i + len(b)] == b: ctag_pos = i + len(b) if otag_pos and ctag_pos: return remove_between(string[:otag_pos] + string[ctag_pos:], a, b) return string.strip() def remove_tag(string, tag="script"): otag = f"<{tag}" ctag = f"" otag_pos = 0 ctag_pos = 0 for i in range(len(string)): if string[i : i + len(otag)] == otag: otag_pos = i elif string[i : i + len(ctag)] == ctag: ctag_pos = i + len(ctag) if otag_pos and ctag_pos: return remove_tag(string[:otag_pos] + string[ctag_pos:], tag) return string def all_text(e): return clear_whitespace(" ".join(e.itertext())) def only_first_text(e): return clear_whitespace([next(e.itertext())])[0] def only_text(e): return " ".join(all_text(e)) def url2str(url: str, clean=True) -> str: headers = { "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.115 Safari/537.36" } # bad_html = requests.get(url, headers=headers) bad_html = requests.get(url) if clean: tree = BeautifulSoup(bad_html.text, features="html.parser") xml_str = str(tree) else: xml_str = bad_html.text xml_str = re.sub(r"[^!]--[^>]", "-->", xml_str) xml_str = re.sub(r"<>", "-->", xml_str) xml_str = remove_tag(xml_str, "head") # xml_str = remove_tag(xml_str) # with open("test.html", "w") as f: # f.write(xml_str) return xml_str # aliases rb = remove_between cw = clear_whitespace ot = only_text at = all_text oft = only_first_text class WordParser: """WordParser needs additional methods to work with Queue: - self.neighbours = words found on the site - self.todict() = returning a dict with the parsed info""" def __init__(self, word, url_prefix, clean=True): self.time = datetime.now().strftime("%Y%m%d-%H%M%S") self.word = uq(word) self.url = f"{url_prefix}{word}" self.xml_string = url2str(self.url, clean=clean) self.root = ET.fromstring(self.xml_string) class FileSet(set): def __init__(self, file): self.file = file if os.path.isfile(self.file): super().__init__({line.strip() for line in open(self.file, "r")}) else: super() self -= {""} def load(self): if os.path.isfile(self.file): self.update({line.strip() for line in open(self.file, "r")}) else: super() def save(self, sort=False): if self: with open(self.file, "w") as f: if sort: f.write("\n".join([w for w in sorted(self) if w])) else: f.write("\n".join([w for w in self if w])) def append(self): if self and os.path.isfile(self.file): self |= {line.strip() for line in open(self.file, "r")} self.save() class DictFile(dict): def __init__(self, file): self.file = file if os.path.isfile(self.file): with open(self.file, "r") as f: super().__init__(json.load(f)) else: super() def save(self): with open(self.file, "w") as f: json.dump(self, f, separators=(",", ":"), indent=2, sort_keys=False) class FullDictionary(dict): def __init__(self, dir_prefix, suffix): self.__dict__.update(locals()) full_dict = {} start = time.time() for db_file in Path(self.dir_prefix).glob(f"*{self.suffix}"): with open(db_file, "r") as f: full_dict |= json.load(f) self.readtime = time.time() - start super().__init__(full_dict) del full_dict class Queue: def __init__( self, Parser, dir_prefix, suffix, time_base=1.01, time_exponent=10, prefix_length=3, ): self.__dict__.update(locals()) self.words = FileSet(f"{dir_prefix}words") self.queue = FileSet(f"{dir_prefix}queue") self.snafus = FileSet(f"{dir_prefix}snafus") self.redo = FileSet(f"{dir_prefix}redo") self.unusual = ( lambda prefix: not all([c in letters for c in prefix.lower()]) or len(prefix) < self.prefix_length ) def wait(self): if int(time.time()) % 10 == 0: # cron job self.words.save(sort=True) self.queue.save() self.time_exponent = abs(self.time_exponent) a = self.time_base**self.time_exponent b = self.time_base ** (self.time_exponent * 3) time.sleep(randtime(a, b)) def loadDB(self): for db_file in Path(self.dir_prefix).glob(f"*{self.suffix}"): with open(db_file, "r") as f: try: self.words |= set(json.load(f).keys()) except json.decoder.JSONDecodeError: print(db_file, " corrupted") exit() def pick_random(self): self.redo.load() self.queue -= self.words self.queue -= self.snafus self.queue |= self.redo if len(self.queue) < 1: p = random.choice(list(self.words)) self.time_exponent += 1 else: p = random.choice(list(self.queue)) self.time_exponent -= 20 return p def add_word(self, p=None): if p == None: p = self.pick_random() try: w = self.Parser(p) # fetch new word print(p) word_dict = w.todict() prefix = p[: self.prefix_length].lower() if self.unusual(prefix): prefix = "_" * self.prefix_length dict_part = DictFile(f"{self.dir_prefix}{prefix}{self.suffix}") dict_part |= word_dict dict_part.save() del dict_part self.words |= set(word_dict.keys()) self.queue |= set(w.neighbours) self.queue -= {p} self.redo -= {p} self.redo.save() self.wait() except ( AssertionError, ET.ParseError, ): self.queue.save() print("snafu... ", p) self.redo -= {p} self.redo.save() self.snafus |= {p} self.snafus.append() self.wait() except ConnectionError: self.queue.save() self.time_exponent += 1 self.wait() if __name__ == "__main__": d = collect_words("en_MerriamWebster/", "_MW.json") print(len(set(d))) # print(d.readtime) time.sleep(3) print("del") del d time.sleep(3)