import random from typing import List import re from pathlib import Path from time import sleep from urllib.parse import urlparse from requests_html import HTMLSession from sqlmodel import Field, Session, SQLModel, create_engine, select, Column, ARRAY, String with open("db.secrets", "r") as f: db_pass = f.readline().strip() class Word(SQLModel, table=True): id: int | None = Field(default=None, primary_key=True) word: str = Field(index=True, unique=True) class Sense(SQLModel, table=True): id: int | None = Field(default=None, primary_key=True) word: str = Field(default=None, foreign_key="word.word") word_class: str | None class Description(SQLModel, table=True): id: int | None = Field(default=None, primary_key=True) word: str | None = Field(default=None, foreign_key="word.word") sense_id: int | None = Field(default=None, foreign_key="sense.id") description: str examples: List[str] | None = Field(sa_column=Column(ARRAY(String))) class Pronunciation(SQLModel, table=True): id: int | None = Field(default=None, primary_key=True) word: str | None = Field(default=None, foreign_key="word.word") sense: int | None = Field(default=None, foreign_key="sense.id") pronunciation: str engine = create_engine(db_pass) def add_word(word): url = f"https://www.merriam-webster.com/dictionary/{word}" r = html_session.get(url) # r.html.render() links = set() with Session(engine) as session: for c in r.html.find("div.entry-word-section-container"): _word = c.find("h1.hword,p.hword", first=True).text try: _class = c.find("h2.parts-of-speech", first=True).text except AttributeError: print(f"fail: {_word}") continue _class = re.sub("\(.*\)", "", _class).strip() links |= { Path(urlparse(link.attrs["href"]).path).name for link in c.find('a[href*="/dictionary/"]') } results = session.exec(select(Word).where(Word.word == _word)).one_or_none() if results: word = results else: word = Word(word=_word) session.add(word) results = session.exec( select(Sense) .where(Sense.word == _word) .where(Sense.word_class == _class) ).one_or_none() if results: sense = results else: sense = Sense(word=word.word, word_class=_class) session.add(sense) session.commit() for sn in c.find("div.sense"): _desc = [] _examples = [] for dt in sn.find("span.dt"): for dttext in dt.find("span.dtText"): _desc.append(dttext.text) for dt in sn.find("div.sub-content-thread"): _example = [] for sents in dt.find("span.sents"): _example.append(sents.text) _examples.append("; ".join(_example)) _final_description = "; ".join(_desc) results = session.exec( select(Description).where( Description.word == word.word, Description.description == _final_description, ) ).one_or_none() if results: continue else: session.add( Description( word=word.word, sense_id=sense.id, description=_final_description, examples=_examples, ) ) for pron in c.find( "span.prons-entries-list-inline div.prons-entry-list-item,a.prons-entry-list-item" ): session.add( Pronunciation( word=word.word, sense=sense.id, pronunciation=pron.text ) ) session.commit() return links def presently_available(): with Session(engine) as session: return session.exec(select(Word.word)).unique() if __name__ == "__main__": SQLModel.metadata.create_all(engine) html_session = HTMLSession() QUEUE = {line.strip() for line in open("queue.db", "rt")} while True: try: if len(QUEUE) < 20: exit() next_word = random.choice(list(QUEUE)) already_present = set(presently_available()) print(next_word, len(QUEUE), len(already_present)) QUEUE |= add_word(next_word) QUEUE -= already_present | {next_word} sleep(random.random() * 5) except KeyboardInterrupt: with open("queue.db", "wt") as f: f.write("\n".join(list(QUEUE))) exit(0)