235 lines
6.6 KiB
Python
235 lines
6.6 KiB
Python
import json
|
|
import random
|
|
import re
|
|
import string
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from xml.etree import ElementTree as ET
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from requests.exceptions import ConnectionError
|
|
|
|
|
|
def randtime(a, b, k=0):
|
|
if k:
|
|
return [random.uniform(a, b) for _ in range(k)]
|
|
else:
|
|
return random.uniform(a, b)
|
|
|
|
|
|
def remove_between(string, a, b):
|
|
otag_pos = 0
|
|
ctag_pos = 0
|
|
for i in range(len(string)):
|
|
if string[i : i + len(a)] == a:
|
|
otag_pos = i
|
|
elif string[i : i + len(b)] == b:
|
|
ctag_pos = i + len(b)
|
|
if otag_pos and ctag_pos:
|
|
return remove_between(string[:otag_pos] + string[ctag_pos:], a, b)
|
|
return string.strip()
|
|
|
|
|
|
def remove_tag(string, tag="script"):
|
|
otag = f"<{tag}"
|
|
ctag = f"</{tag}>"
|
|
otag_pos = 0
|
|
ctag_pos = 0
|
|
for i in range(len(string)):
|
|
if string[i : i + len(otag)] == otag:
|
|
otag_pos = i
|
|
elif string[i : i + len(ctag)] == ctag:
|
|
ctag_pos = i + len(ctag)
|
|
if otag_pos and ctag_pos:
|
|
return remove_tag(string[:otag_pos] + string[ctag_pos:], tag)
|
|
return string
|
|
|
|
|
|
def all_text(e):
|
|
return clear_whitespace(" ".join(e.itertext()))
|
|
|
|
|
|
def only_text(e):
|
|
return " ".join(all_text(e))
|
|
|
|
|
|
def clear_whitespace(data):
|
|
if isinstance(data, list):
|
|
iterator = enumerate(data)
|
|
elif isinstance(data, dict):
|
|
iterator = data.items()
|
|
elif isinstance(data, str):
|
|
data = [data]
|
|
iterator = enumerate(data)
|
|
else:
|
|
raise TypeError("can only traverse list or dict")
|
|
for i, value in iterator:
|
|
if isinstance(value, (list, dict)):
|
|
clear_whitespace(value)
|
|
elif isinstance(value, str):
|
|
data[i] = re.sub(r"[\n\t\s]+", " ", value).strip()
|
|
return data
|
|
|
|
|
|
def url2str(url: str) -> str:
|
|
headers = {
|
|
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.115 Safari/537.36"
|
|
}
|
|
# bad_html = requests.get(url, headers=headers)
|
|
bad_html = requests.get(url)
|
|
tree = BeautifulSoup(bad_html.text, features="lxml")
|
|
xml_str = str(tree)
|
|
xml_str = remove_tag(xml_str, "head")
|
|
xml_str = remove_tag(xml_str)
|
|
# with open("test.html", "w") as f:
|
|
# f.write(xml_str)
|
|
return xml_str
|
|
|
|
|
|
# aliases
|
|
rb = remove_between
|
|
cw = clear_whitespace
|
|
ot = only_text
|
|
|
|
|
|
class WordParser:
|
|
"""WordParser needs additional methods to work with Queue:
|
|
- self.neighbours = words found on the site
|
|
- self.todict() = returning a dict with the parsed info"""
|
|
|
|
def __init__(self, word, url_prefix):
|
|
self.time = datetime.now().strftime("%Y%m%d-%H%M%S")
|
|
self.word = word
|
|
self.url = f"{url_prefix}{word}"
|
|
self.xml_string = url2str(self.url)
|
|
self.root = ET.fromstring(self.xml_string)
|
|
|
|
|
|
class FileSet(set):
|
|
def __init__(self, file):
|
|
self.file = file
|
|
super().__init__({line.strip() for line in open(self.file, "r")})
|
|
|
|
def load(self):
|
|
self.update({line.strip() for line in open(self.file, "r")})
|
|
|
|
def save(self):
|
|
with open(self.file, "w") as f:
|
|
f.write("\n".join(list(self)))
|
|
|
|
def append(self):
|
|
self |= {line.strip() for line in open(self.file, "r")}
|
|
self.save()
|
|
|
|
|
|
class Dictionary(dict):
|
|
def __init__(self, dir_prefix, suffix):
|
|
self.__dict__.update(locals())
|
|
full_dict = {}
|
|
start = time.time()
|
|
for db_file in Path(self.dir_prefix).glob(f"*{self.suffix}"):
|
|
with open(db_file, "r") as f:
|
|
full_dict |= json.load(f)
|
|
self.readtime = time.time() - start
|
|
super().__init__(full_dict)
|
|
|
|
|
|
class Queue:
|
|
def __init__(
|
|
self,
|
|
Parser,
|
|
dir_prefix,
|
|
suffix,
|
|
time_base=1.01,
|
|
time_exponent=10,
|
|
prefix_length=1,
|
|
):
|
|
self.__dict__.update(locals())
|
|
self.letters = string.ascii_lowercase
|
|
self.full_dict = {}
|
|
self.queue = FileSet(f"{dir_prefix}queue")
|
|
self.snafus = FileSet(f"{dir_prefix}snafus")
|
|
self.redo = FileSet(f"{dir_prefix}redo")
|
|
|
|
def wait(self):
|
|
a = self.time_base**self.time_exponent
|
|
b = self.time_base ** (self.time_exponent * 3)
|
|
time.sleep(randtime(a, b))
|
|
|
|
def loadDB(self):
|
|
for db_file in Path(self.dir_prefix).glob(f"*{self.suffix}"):
|
|
with open(db_file, "r") as f:
|
|
self.full_dict |= json.load(f)
|
|
|
|
def updateDB(self, pick):
|
|
start = time.time()
|
|
|
|
prefix = pick[: self.prefix_length].lower()
|
|
if all([c.lower() in self.letters for c in prefix]):
|
|
c_db = {
|
|
k: v
|
|
for k, v in self.full_dict.items()
|
|
if k[: self.prefix_length].lower() == prefix
|
|
}
|
|
else:
|
|
c_db = {
|
|
k: v
|
|
for k, v in self.full_dict.items()
|
|
if any([c.lower() not in self.letters for c in k[: self.prefix_length]])
|
|
}
|
|
prefix = "_" * self.prefix_length
|
|
|
|
with open(f"{self.dir_prefix}{prefix}{self.suffix}", "w") as f: # save DB
|
|
json.dump(c_db, f, separators=(",", ":"), indent=2, sort_keys=True)
|
|
|
|
def add_word(self):
|
|
self.redo.load()
|
|
self.queue -= set(self.full_dict.keys())
|
|
self.queue -= self.snafus
|
|
self.queue |= self.redo
|
|
len_queue = len(self.queue) # actual queue
|
|
p = random.choice(list(self.queue))
|
|
try:
|
|
start_parsing = time.time()
|
|
w = self.Parser(p) # fetch new word
|
|
word_dict = w.todict()
|
|
start_db_stuff = time.time()
|
|
self.full_dict |= word_dict
|
|
|
|
self.queue |= set(w.neighbours)
|
|
self.queue -= {p}
|
|
self.redo -= {p}
|
|
self.redo.save()
|
|
self.updateDB(p)
|
|
print(
|
|
f"{len(self.full_dict)} words collected, "
|
|
f"{len_queue} words waiting in queue, "
|
|
f"{start_db_stuff-start_parsing:.06f}s"
|
|
f"/{time.time() - start_db_stuff:.06f}s"
|
|
)
|
|
self.wait()
|
|
except (
|
|
AssertionError,
|
|
ET.ParseError,
|
|
):
|
|
self.queue.save()
|
|
print("snafu... ", p)
|
|
self.redo -= {p}
|
|
self.redo.save()
|
|
self.snafus |= {p}
|
|
self.snafus.append()
|
|
self.wait()
|
|
except ConnectionError:
|
|
self.queue.save()
|
|
self.time_exponent += 1
|
|
self.wait()
|
|
except:
|
|
self.queue.save()
|
|
exit()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
f = FileSet("en_merriam_webster/queue")
|