dict_dl/dict_dl.py

203 lines
5.9 KiB
Python
Raw Normal View History

2022-07-06 11:06:37 +00:00
import json
2022-07-06 11:13:44 +00:00
from requests.exceptions import ConnectionError
2022-07-06 11:06:37 +00:00
import random
import re
import string
import time
from datetime import datetime
from pathlib import Path
from xml.etree import ElementTree as ET
import requests
from bs4 import BeautifulSoup
def randtime(a, b, k=0):
if k:
return [random.uniform(a, b) for _ in range(k)]
else:
return random.uniform(a, b)
def remove_between(string, a, b):
otag_pos = 0
ctag_pos = 0
for i in range(len(string)):
if string[i : i + len(a)] == a:
otag_pos = i
elif string[i : i + len(b)] == b:
ctag_pos = i + len(b)
if otag_pos and ctag_pos:
return remove_between(string[:otag_pos] + string[ctag_pos:], a, b)
return string.strip()
def remove_tag(string, tag="script"):
otag = f"<{tag}"
ctag = f"</{tag}>"
otag_pos = 0
ctag_pos = 0
for i in range(len(string)):
if string[i : i + len(otag)] == otag:
otag_pos = i
elif string[i : i + len(ctag)] == ctag:
ctag_pos = i + len(ctag)
if otag_pos and ctag_pos:
return remove_tag(string[:otag_pos] + string[ctag_pos:], tag)
return string
def all_text(e):
return clear_whitespace(" ".join(e.itertext()))
def only_text(e):
return " ".join(all_text(e))
def clear_whitespace(data):
if isinstance(data, list):
iterator = enumerate(data)
elif isinstance(data, dict):
iterator = data.items()
elif isinstance(data, str):
data = [data]
iterator = enumerate(data)
else:
raise TypeError("can only traverse list or dict")
for i, value in iterator:
if isinstance(value, (list, dict)):
clear_whitespace(value)
elif isinstance(value, str):
data[i] = re.sub(r"[\n\t\s]+", " ", value).strip()
return data
def url2str(url: str) -> str:
headers = {
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.115 Safari/537.36"
}
bad_html = requests.get(url, headers=headers)
tree = BeautifulSoup(bad_html.text, features="lxml")
xml_str = str(tree)
xml_str = remove_tag(xml_str, "head")
xml_str = remove_tag(xml_str)
# with open("test.html", "w") as f:
# f.write(xml_str)
return xml_str
# aliases
rb = remove_between
cw = clear_whitespace
ot = only_text
class WordParser:
def __init__(self, word, url_prefix):
self.time = datetime.now().strftime("%Y%m%d-%H%M%S")
self.word = word
self.url = f"{url_prefix}{word}"
self.xml_string = url2str(self.url)
self.root = ET.fromstring(self.xml_string)
class FileSet(set):
def __init__(self, file):
self.file = file
self.elements = {line.strip() for line in open(self.file, "r")}
super().__init__(self.elements)
def save(self):
with open(self.file, "w") as f:
f.write("\n".join(list(self)))
def append(self):
self |= {line.strip() for line in open(self.file, "r")}
self.save()
class Queue:
def __init__(
self,
Parser,
dir_prefix,
suffix,
time_base=1.01,
time_exponent=10,
prefix_length=1,
):
self.__dict__.update(locals())
self.letters = string.ascii_lowercase
self.full_dict = {}
self.queue = FileSet(f"{dir_prefix}queue")
self.snafus = FileSet(f"{dir_prefix}snafus")
self.redo = FileSet(f"{dir_prefix}redo")
def wait(self):
a = self.time_base**self.time_exponent
b = self.time_base ** (self.time_exponent * 3)
time.sleep(randtime(a, b))
def loadDB(self):
for db_file in Path(self.dir_prefix).glob(f"*{self.suffix}"):
with open(db_file, "r") as f:
self.full_dict |= json.load(f)
def updateDB(self, pick):
start = time.time()
prefix = pick[: self.prefix_length].lower()
if all([c in self.letters for c in prefix]):
c_db = {
k: v
for k, v in self.full_dict.items()
if k[: self.prefix_length].lower() == prefix
}
else:
c_db = {
k: v
for k, v in self.full_dict.items()
if any([c not in self.letters for c in k[: self.prefix_length]])
}
prefix = "_" * self.prefix_length
with open(f"{self.dir_prefix}{prefix}{self.suffix}", "w") as f: # save DB
json.dump(c_db, f, separators=(",", ":"), indent=2)
def add_word(self):
p = random.choice(
list(self.queue | self.redo - self.snafus - self.full_dict.keys())
)
try:
start_parsing = time.time()
w = self.Parser(p) # fetch new word
word_dict = w.todict()
start_db_stuff = time.time()
self.full_dict |= word_dict
for k in ["neighbors", "synonyms", "antonyms"]:
if k in word_dict:
self.queue |= set(word_dict[k])
self.queue -= {p}
self.redo -= {p}
self.updateDB(p)
print(
f"{len(self.full_dict)} words collected, "
f"{len(self.queue)} words waiting in queue, "
f"{start_db_stuff-start_parsing:.06f}s"
f"/{time.time() - start_db_stuff:.06f}s"
)
self.wait()
2022-07-06 11:13:44 +00:00
except (KeyboardInterrupt, AssertionError, requests.exceptions.ConnectionError ) as e:
2022-07-06 11:06:37 +00:00
self.queue.save()
self.redo.save()
if e == KeyboardInterrupt:
exit()
elif e == AssertionError:
print(w.time, p)
self.snafus |= {p}
self.snafus.append()
self.wait()
2022-07-06 11:13:44 +00:00
elif e == ConnectionError:
self.time_exponent += 1
self.wait()