dict_dl/dict_dl.py
2022-07-11 08:10:43 +02:00

293 lines
7.9 KiB
Python

import json
import os
import random
import re
import string
import time
from datetime import datetime
from pathlib import Path
from urllib.parse import unquote
from xml.etree import ElementTree as ET
import requests
from bs4 import BeautifulSoup
from requests.exceptions import ConnectionError
letters = string.ascii_lowercase
# def uq(s):
# return unquote(s).split("?")[0]
uq = unquote
def uqall(data):
if isinstance(data, list):
iterator = enumerate(data)
elif isinstance(data, dict):
for k in list(data.keys()):
if "%" in k:
data[uq(k)] = data.pop(k)
iterator = data.items()
elif isinstance(data, str):
data = [data]
iterator = enumerate(data)
else:
raise TypeError("can only traverse list or dict")
for i, value in iterator:
if isinstance(value, (list, dict)):
uqall(value)
elif isinstance(value, str):
if "%" in value:
data[i] = uq(value)
return data
def clear_whitespace(data):
if isinstance(data, list):
iterator = enumerate(data)
elif isinstance(data, dict):
iterator = data.items()
elif isinstance(data, str):
data = [data]
iterator = enumerate(data)
else:
raise TypeError("can only traverse list or dict")
for i, value in iterator:
if isinstance(value, (list, dict)):
clear_whitespace(value)
elif isinstance(value, str):
data[i] = re.sub(r"[\n\t\s]+", " ", value).strip()
return data
def randtime(a, b, k=0):
if k:
return [random.uniform(a, b) for _ in range(k)]
else:
return random.uniform(a, b)
def remove_between(string, a, b):
otag_pos = 0
ctag_pos = 0
for i in range(len(string)):
if string[i : i + len(a)] == a:
otag_pos = i
elif string[i : i + len(b)] == b:
ctag_pos = i + len(b)
if otag_pos and ctag_pos:
return remove_between(string[:otag_pos] + string[ctag_pos:], a, b)
return string.strip()
def remove_tag(string, tag="script"):
otag = f"<{tag}"
ctag = f"</{tag}>"
otag_pos = 0
ctag_pos = 0
for i in range(len(string)):
if string[i : i + len(otag)] == otag:
otag_pos = i
elif string[i : i + len(ctag)] == ctag:
ctag_pos = i + len(ctag)
if otag_pos and ctag_pos:
return remove_tag(string[:otag_pos] + string[ctag_pos:], tag)
return string
def all_text(e):
return clear_whitespace(" ".join(e.itertext()))
def only_first_text(e):
return clear_whitespace([next(e.itertext())])[0]
def only_text(e):
return " ".join(all_text(e))
def url2str(url: str) -> str:
headers = {
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.115 Safari/537.36"
}
# bad_html = requests.get(url, headers=headers)
bad_html = requests.get(url)
tree = BeautifulSoup(bad_html.text, features="lxml")
xml_str = str(tree)
xml_str = remove_tag(xml_str, "head")
xml_str = remove_tag(xml_str)
# with open("test.html", "w") as f:
# f.write(xml_str)
return xml_str
# aliases
rb = remove_between
cw = clear_whitespace
ot = only_text
at = all_text
oft = only_first_text
class WordParser:
"""WordParser needs additional methods to work with Queue:
- self.neighbours = words found on the site
- self.todict() = returning a dict with the parsed info"""
def __init__(self, word, url_prefix):
self.time = datetime.now().strftime("%Y%m%d-%H%M%S")
self.word = word
self.url = f"{url_prefix}{word}"
self.xml_string = url2str(self.url)
self.root = ET.fromstring(self.xml_string)
class FileSet(set):
def __init__(self, file):
self.file = file
super().__init__({line.strip() for line in open(self.file, "r")})
def load(self):
self.update({line.strip() for line in open(self.file, "r")})
def save(self):
if self:
with open(self.file, "w") as f:
f.write("\n".join([w for w in self if w]))
def append(self):
if self:
self |= {line.strip() for line in open(self.file, "r")}
self.save()
class DictFile(dict):
def __init__(self, file):
self.file = file
if os.path.isfile(self.file):
with open(self.file, "r") as f:
super().__init__(json.load(f))
else:
super()
def save(self):
with open(self.file, "w") as f:
json.dump(self, f, separators=(",", ":"), indent=2, sort_keys=False)
class FullDictionary(dict):
def __init__(self, dir_prefix, suffix):
self.__dict__.update(locals())
full_dict = {}
start = time.time()
for db_file in Path(self.dir_prefix).glob(f"*{self.suffix}"):
with open(db_file, "r") as f:
full_dict |= json.load(f)
self.readtime = time.time() - start
super().__init__(full_dict)
del full_dict
class Queue:
def __init__(
self,
Parser,
dir_prefix,
suffix,
time_base=1.01,
time_exponent=10,
prefix_length=3,
):
self.__dict__.update(locals())
self.words = set()
self.queue = FileSet(f"{dir_prefix}queue")
self.snafus = FileSet(f"{dir_prefix}snafus")
self.redo = FileSet(f"{dir_prefix}redo")
self.unusual = lambda prefix: not all([c in letters for c in prefix.lower()]) or len(prefix) < self.prefix_length
def wait(self):
self.time_exponent = abs(self.time_exponent)
a = self.time_base**self.time_exponent
b = self.time_base ** (self.time_exponent * 3)
time.sleep(randtime(a, b))
def loadDB(self):
for db_file in Path(self.dir_prefix).glob(f"*{self.suffix}"):
with open(db_file, "r") as f:
self.words |= set(json.load(f).keys())
def pick_random(self):
self.redo.load()
self.queue -= self.words
self.queue -= self.snafus
self.queue |= self.redo
if len(self.queue) < 1:
p = random.choice(list(self.words))
self.time_exponent += 1
else:
p = random.choice(list(self.queue))
self.time_exponent -= 20
else:
return p
def add_word(self, p=None):
if p == None:
p = self.pick_random()
try:
start_parsing = time.time()
w = self.Parser(p) # fetch new word
word_dict = w.todict()
print(
f"{p} | "
f"{len(self.words)} words collected, "
f"{len(self.queue)} words waiting in queue"
# f", {start_db_stuff-start_parsing:.06f}s"
# f"/{time.time() - start_db_stuff:.06f}s"
)
start_db_stuff = time.time()
prefix = p[: self.prefix_length].lower()
if self.unusual(prefix):
prefix = "_" * self.prefix_length
dict_part = DictFile(f"{self.dir_prefix}{prefix}{self.suffix}")
dict_part |= word_dict
dict_part.save()
del dict_part
self.words |= set(word_dict.keys())
self.queue |= set(w.neighbours)
self.queue -= {p}
self.redo -= {p}
self.redo.save()
self.wait()
except (
AssertionError,
ET.ParseError,
):
self.queue.save()
print("snafu... ", p)
self.redo -= {p}
self.redo.save()
self.snafus |= {p}
self.snafus.append()
self.wait()
except ConnectionError:
self.queue.save()
self.time_exponent += 1
self.wait()
if __name__ == "__main__":
d = collect_words("en_MerriamWebster/", "_MW.json")
print(len(set(d)))
# print(d.readtime)
time.sleep(3)
print("del")
del d
time.sleep(3)