307 lines
8.4 KiB
Python
307 lines
8.4 KiB
Python
import json
|
|
import os
|
|
import random
|
|
import re
|
|
import string
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from urllib.parse import unquote
|
|
from xml.etree import ElementTree as ET
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from requests.exceptions import ConnectionError
|
|
|
|
letters = string.ascii_lowercase
|
|
# def uq(s):
|
|
# return unquote(s).split("?")[0]
|
|
uq = unquote
|
|
|
|
|
|
def uqall(data):
|
|
if isinstance(data, list):
|
|
iterator = enumerate(data)
|
|
elif isinstance(data, dict):
|
|
for k in list(data.keys()):
|
|
if "%" in k:
|
|
data[uq(k)] = data.pop(k)
|
|
iterator = data.items()
|
|
elif isinstance(data, str):
|
|
data = [data]
|
|
iterator = enumerate(data)
|
|
else:
|
|
raise TypeError("can only traverse list or dict")
|
|
for i, value in iterator:
|
|
if isinstance(value, (list, dict)):
|
|
uqall(value)
|
|
elif isinstance(value, str):
|
|
if "%" in value:
|
|
data[i] = uq(value)
|
|
return data
|
|
|
|
|
|
def clear_whitespace(data):
|
|
if isinstance(data, list):
|
|
iterator = enumerate(data)
|
|
elif isinstance(data, dict):
|
|
iterator = data.items()
|
|
elif isinstance(data, str):
|
|
data = [data]
|
|
iterator = enumerate(data)
|
|
else:
|
|
raise TypeError("can only traverse list or dict")
|
|
for i, value in iterator:
|
|
if isinstance(value, (list, dict)):
|
|
clear_whitespace(value)
|
|
elif isinstance(value, str):
|
|
data[i] = re.sub(r"[\n\t\s]+", " ", value).strip()
|
|
return data
|
|
|
|
|
|
def randtime(a, b, k=0):
|
|
if k:
|
|
return [random.uniform(a, b) for _ in range(k)]
|
|
else:
|
|
return random.uniform(a, b)
|
|
|
|
|
|
def remove_between(string, a, b):
|
|
otag_pos = 0
|
|
ctag_pos = 0
|
|
for i in range(len(string)):
|
|
if string[i : i + len(a)] == a:
|
|
otag_pos = i
|
|
elif string[i : i + len(b)] == b:
|
|
ctag_pos = i + len(b)
|
|
if otag_pos and ctag_pos:
|
|
return remove_between(string[:otag_pos] + string[ctag_pos:], a, b)
|
|
return string.strip()
|
|
|
|
|
|
def remove_tag(string, tag="script"):
|
|
otag = f"<{tag}"
|
|
ctag = f"</{tag}>"
|
|
otag_pos = 0
|
|
ctag_pos = 0
|
|
for i in range(len(string)):
|
|
if string[i : i + len(otag)] == otag:
|
|
otag_pos = i
|
|
elif string[i : i + len(ctag)] == ctag:
|
|
ctag_pos = i + len(ctag)
|
|
if otag_pos and ctag_pos:
|
|
return remove_tag(string[:otag_pos] + string[ctag_pos:], tag)
|
|
return string
|
|
|
|
|
|
def all_text(e):
|
|
return clear_whitespace(" ".join(e.itertext()))
|
|
|
|
|
|
def only_first_text(e):
|
|
return clear_whitespace([next(e.itertext())])[0]
|
|
|
|
|
|
def only_text(e):
|
|
return " ".join(all_text(e))
|
|
|
|
|
|
def url2str(url: str, clean=True) -> str:
|
|
headers = {
|
|
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.115 Safari/537.36"
|
|
}
|
|
# bad_html = requests.get(url, headers=headers)
|
|
bad_html = requests.get(url)
|
|
if clean:
|
|
tree = BeautifulSoup(bad_html.text, features="html.parser")
|
|
xml_str = str(tree)
|
|
else:
|
|
xml_str = bad_html.text
|
|
|
|
xml_str = re.sub(r"[^!]--[^>]", "-->", xml_str)
|
|
xml_str = re.sub(r"<>", "-->", xml_str)
|
|
xml_str = remove_tag(xml_str, "head")
|
|
# xml_str = remove_tag(xml_str)
|
|
with open("test.html", "w") as f:
|
|
f.write(xml_str)
|
|
return xml_str
|
|
|
|
|
|
# aliases
|
|
rb = remove_between
|
|
cw = clear_whitespace
|
|
ot = only_text
|
|
at = all_text
|
|
oft = only_first_text
|
|
|
|
|
|
class WordParser:
|
|
"""WordParser needs additional methods to work with Queue:
|
|
- self.neighbours = words found on the site
|
|
- self.todict() = returning a dict with the parsed info"""
|
|
|
|
def __init__(self, word, url_prefix, clean=True):
|
|
self.time = datetime.now().strftime("%Y%m%d-%H%M%S")
|
|
self.word = uq(word)
|
|
self.url = f"{url_prefix}{word}"
|
|
self.xml_string = url2str(self.url, clean=clean)
|
|
self.root = ET.fromstring(self.xml_string)
|
|
|
|
|
|
class FileSet(set):
|
|
def __init__(self, file):
|
|
self.file = file
|
|
if os.path.isfile(self.file):
|
|
super().__init__({line.strip() for line in open(self.file, "r")})
|
|
else:
|
|
super()
|
|
self -= {""}
|
|
|
|
def load(self):
|
|
if os.path.isfile(self.file):
|
|
self.update({line.strip() for line in open(self.file, "r")})
|
|
else:
|
|
super()
|
|
|
|
def save(self, sort=False):
|
|
if self:
|
|
with open(self.file, "w") as f:
|
|
if sort:
|
|
f.write("\n".join([w for w in sorted(self) if w]))
|
|
else:
|
|
f.write("\n".join([w for w in self if w]))
|
|
|
|
def append(self):
|
|
if self and os.path.isfile(self.file):
|
|
self |= {line.strip() for line in open(self.file, "r")}
|
|
self.save()
|
|
|
|
|
|
class DictFile(dict):
|
|
def __init__(self, file):
|
|
self.file = file
|
|
if os.path.isfile(self.file):
|
|
with open(self.file, "r") as f:
|
|
super().__init__(json.load(f))
|
|
else:
|
|
super()
|
|
|
|
def save(self):
|
|
with open(self.file, "w") as f:
|
|
json.dump(self, f, separators=(",", ":"), indent=2, sort_keys=False)
|
|
|
|
|
|
class FullDictionary(dict):
|
|
def __init__(self, dir_prefix, suffix):
|
|
self.__dict__.update(locals())
|
|
full_dict = {}
|
|
start = time.time()
|
|
for db_file in Path(self.dir_prefix).glob(f"*{self.suffix}"):
|
|
with open(db_file, "r") as f:
|
|
full_dict |= json.load(f)
|
|
self.readtime = time.time() - start
|
|
super().__init__(full_dict)
|
|
del full_dict
|
|
|
|
|
|
class Queue:
|
|
def __init__(
|
|
self,
|
|
Parser,
|
|
dir_prefix,
|
|
suffix,
|
|
time_base=1.01,
|
|
time_exponent=10,
|
|
prefix_length=3,
|
|
):
|
|
self.__dict__.update(locals())
|
|
self.words = FileSet(f"{dir_prefix}words")
|
|
self.queue = FileSet(f"{dir_prefix}queue")
|
|
self.snafus = FileSet(f"{dir_prefix}snafus")
|
|
self.redo = FileSet(f"{dir_prefix}redo")
|
|
self.unusual = (
|
|
lambda prefix: not all([c in letters for c in prefix.lower()])
|
|
or len(prefix) < self.prefix_length
|
|
)
|
|
|
|
def wait(self):
|
|
if int(time.time()) % 10 == 0: # cron job
|
|
self.words.save(sort=True)
|
|
self.queue.save()
|
|
self.time_exponent = abs(self.time_exponent)
|
|
a = self.time_base**self.time_exponent
|
|
b = self.time_base ** (self.time_exponent * 3)
|
|
time.sleep(randtime(a, b))
|
|
|
|
def loadDB(self):
|
|
for db_file in Path(self.dir_prefix).glob(f"*{self.suffix}"):
|
|
with open(db_file, "r") as f:
|
|
try:
|
|
self.words |= set(json.load(f).keys())
|
|
except json.decoder.JSONDecodeError:
|
|
print(db_file, " corrupted")
|
|
exit()
|
|
|
|
def pick_random(self):
|
|
self.redo.load()
|
|
self.queue -= self.words
|
|
self.queue -= self.snafus
|
|
self.queue |= self.redo
|
|
if len(self.queue) < 1:
|
|
p = random.choice(list(self.words))
|
|
self.time_exponent += 1
|
|
else:
|
|
p = random.choice(list(self.queue))
|
|
self.time_exponent -= 20
|
|
return p
|
|
|
|
def add_word(self, p=None):
|
|
if p == None:
|
|
p = self.pick_random()
|
|
try:
|
|
w = self.Parser(p) # fetch new word
|
|
print(p)
|
|
word_dict = w.todict()
|
|
|
|
prefix = p[: self.prefix_length].lower()
|
|
if self.unusual(prefix):
|
|
prefix = "_" * self.prefix_length
|
|
|
|
dict_part = DictFile(f"{self.dir_prefix}{prefix}{self.suffix}")
|
|
dict_part |= word_dict
|
|
dict_part.save()
|
|
del dict_part
|
|
|
|
self.words |= set(word_dict.keys())
|
|
|
|
self.queue |= set(w.neighbours)
|
|
self.queue -= {p}
|
|
self.redo -= {p}
|
|
self.redo.save()
|
|
self.wait()
|
|
except (
|
|
AssertionError,
|
|
ET.ParseError,
|
|
):
|
|
self.queue.save()
|
|
print("snafu... ", p)
|
|
self.redo -= {p}
|
|
self.redo.save()
|
|
self.snafus |= {p}
|
|
self.snafus.append()
|
|
self.wait()
|
|
except ConnectionError:
|
|
self.queue.save()
|
|
self.time_exponent += 1
|
|
self.wait()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
d = collect_words("en_MerriamWebster/", "_MW.json")
|
|
print(len(set(d)))
|
|
# print(d.readtime)
|
|
time.sleep(3)
|
|
print("del")
|
|
del d
|
|
time.sleep(3)
|