dict_dl/dict_dl.py
2022-07-08 14:36:55 +00:00

264 lines
7.4 KiB
Python

import json
import random
import re
import string
import time
from datetime import datetime
from pathlib import Path
from urllib.parse import unquote as uq
from xml.etree import ElementTree as ET
import requests
from bs4 import BeautifulSoup
from requests.exceptions import ConnectionError
def uqall(data):
if isinstance(data, list):
iterator = enumerate(data)
elif isinstance(data, dict):
for k in list(data.keys()):
data[uq(k)] = data.pop(k)
iterator = data.items()
elif isinstance(data, str):
data = [data]
iterator = enumerate(data)
else:
raise TypeError("can only traverse list or dict")
for i, value in iterator:
if isinstance(value, (list, dict)):
uqall(value)
elif isinstance(value, str):
data[i] = uq(value)
return data
def clear_whitespace(data):
if isinstance(data, list):
iterator = enumerate(data)
elif isinstance(data, dict):
iterator = data.items()
elif isinstance(data, str):
data = [data]
iterator = enumerate(data)
else:
raise TypeError("can only traverse list or dict")
for i, value in iterator:
if isinstance(value, (list, dict)):
clear_whitespace(value)
elif isinstance(value, str):
data[i] = re.sub(r"[\n\t\s]+", " ", value).strip()
return data
def randtime(a, b, k=0):
if k:
return [random.uniform(a, b) for _ in range(k)]
else:
return random.uniform(a, b)
def remove_between(string, a, b):
otag_pos = 0
ctag_pos = 0
for i in range(len(string)):
if string[i : i + len(a)] == a:
otag_pos = i
elif string[i : i + len(b)] == b:
ctag_pos = i + len(b)
if otag_pos and ctag_pos:
return remove_between(string[:otag_pos] + string[ctag_pos:], a, b)
return string.strip()
def remove_tag(string, tag="script"):
otag = f"<{tag}"
ctag = f"</{tag}>"
otag_pos = 0
ctag_pos = 0
for i in range(len(string)):
if string[i : i + len(otag)] == otag:
otag_pos = i
elif string[i : i + len(ctag)] == ctag:
ctag_pos = i + len(ctag)
if otag_pos and ctag_pos:
return remove_tag(string[:otag_pos] + string[ctag_pos:], tag)
return string
def all_text(e):
return clear_whitespace(" ".join(e.itertext()))
def only_first_text(e):
return clear_whitespace([next(e.itertext())])[0]
def only_text(e):
return " ".join(all_text(e))
def url2str(url: str) -> str:
headers = {
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.115 Safari/537.36"
}
# bad_html = requests.get(url, headers=headers)
bad_html = requests.get(url)
tree = BeautifulSoup(bad_html.text, features="lxml")
xml_str = str(tree)
xml_str = remove_tag(xml_str, "head")
xml_str = remove_tag(xml_str)
# with open("test.html", "w") as f:
# f.write(xml_str)
return xml_str
# aliases
rb = remove_between
cw = clear_whitespace
ot = only_text
at = all_text
oft = only_first_text
class WordParser:
"""WordParser needs additional methods to work with Queue:
- self.neighbours = words found on the site
- self.todict() = returning a dict with the parsed info"""
def __init__(self, word, url_prefix):
self.time = datetime.now().strftime("%Y%m%d-%H%M%S")
self.word = word
self.url = f"{url_prefix}{word}"
self.xml_string = url2str(self.url)
self.root = ET.fromstring(self.xml_string)
class FileSet(set):
def __init__(self, file):
self.file = file
super().__init__({line.strip() for line in open(self.file, "r")})
def load(self):
self.update({line.strip() for line in open(self.file, "r")})
def save(self):
if self:
with open(self.file, "w") as f:
f.write("\n".join([w for w in self if w]))
def append(self):
if self:
self |= {line.strip() for line in open(self.file, "r")}
self.save()
class Dictionary(dict):
def __init__(self, dir_prefix, suffix):
self.__dict__.update(locals())
full_dict = {}
start = time.time()
for db_file in Path(self.dir_prefix).glob(f"*{self.suffix}"):
with open(db_file, "r") as f:
full_dict |= json.load(f)
self.readtime = time.time() - start
super().__init__(full_dict)
class Queue:
def __init__(
self,
Parser,
dir_prefix,
suffix,
time_base=1.01,
time_exponent=10,
prefix_length=1,
):
self.__dict__.update(locals())
self.letters = string.ascii_lowercase
self.full_dict = {}
self.queue = FileSet(f"{dir_prefix}queue")
self.snafus = FileSet(f"{dir_prefix}snafus")
self.redo = FileSet(f"{dir_prefix}redo")
def wait(self):
a = self.time_base**self.time_exponent
b = self.time_base ** (self.time_exponent * 3)
time.sleep(randtime(a, b))
def loadDB(self):
for db_file in Path(self.dir_prefix).glob(f"*{self.suffix}"):
with open(db_file, "r") as f:
self.full_dict |= json.load(f)
self.full_dict = uqall(self.full_dict)
def updateDB(self, pick):
start = time.time()
prefix = pick[: self.prefix_length].lower()
if all([c.lower() in self.letters for c in prefix]):
c_db = {
k: v
for k, v in self.full_dict.items()
if k[: self.prefix_length].lower() == prefix
}
else:
c_db = {
k: v
for k, v in self.full_dict.items()
if any([c.lower() not in self.letters for c in k[: self.prefix_length]])
}
prefix = "_" * self.prefix_length
with open(f"{self.dir_prefix}{prefix}{self.suffix}", "w") as f: # save DB
json.dump(c_db, f, separators=(",", ":"), indent=2, sort_keys=False)
def add_word(self):
self.redo.load()
self.queue -= set(self.full_dict.keys())
self.queue -= self.snafus
self.queue |= self.redo
len_queue = len(self.queue) # actual queue
p = random.choice(list(self.queue))
try:
start_parsing = time.time()
w = self.Parser(p) # fetch new word
word_dict = w.todict()
start_db_stuff = time.time()
self.full_dict |= word_dict
print(
f"{p} | "
f"{len(self.full_dict)} words collected, "
f"{len_queue} words waiting in queue"
# f", {start_db_stuff-start_parsing:.06f}s"
# f"/{time.time() - start_db_stuff:.06f}s"
)
self.queue |= set(w.neighbours)
self.queue -= {p}
self.redo -= {p}
self.redo.save()
self.updateDB(p)
self.wait()
except (
AssertionError,
ET.ParseError,
):
self.queue.save()
print("snafu... ", p)
self.redo -= {p}
self.redo.save()
self.snafus |= {p}
self.snafus.append()
self.wait()
except ConnectionError:
self.queue.save()
self.time_exponent += 1
self.wait()
if __name__ == "__main__":
f = FileSet("en_merriam_webster/queue")