#!/usr/bin/env python3 """ Парсер новостей с mont.ru → публикует в ZKART БД. Запуск: python3 mont_scraper.py [--all10] """ import re, os, sys, secrets, datetime, sqlite3, time, json from urllib.request import urlopen, Request, build_opener, HTTPCookieProcessor from http.cookiejar import CookieJar from urllib.parse import urlencode, urlparse from html import unescape DB_PATH = "/home/ruslan/docker/ZKART#/matrix.db" IMG_DIR = "/home/ruslan/docker/ZKART#/static/news_images" BASE_URL = "https://www.mont.ru" LIST_URL = "https://www.mont.ru/ru-ru/news?period=1" SITE_BASE = "https://maps.4mont.ru" TG_TOKEN = "8181219074:AAGvqWqb6t10YP4xpMOQnBq_6LrUqAFm5hM" TG_CHAT_ID = "54986411" MONT_EMAIL = "rgalyaviev@mont.com" MONT_PASS = "utOgbZ09mont" HEADERS = {"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", "Accept": "text/html,application/xhtml+xml,*/*;q=0.9"} os.makedirs(IMG_DIR, exist_ok=True) # ── Auth ────────────────────────────────────────────────────────────────────── def make_authenticated_opener() -> build_opener: """Login to mont.ru via OIDC and return an opener with auth cookies.""" jar = CookieJar() opener = build_opener(HTTPCookieProcessor(jar)) # Step 1: GET login → redirected to passport.mont.ru req = Request(f"{BASE_URL}/ru-ru/account/login", headers=HEADERS) with opener.open(req, timeout=20) as r: html = r.read().decode("utf-8", errors="replace") login_url = r.url form_action = re.search(r']+action="([^"]+)"', html) xsrf_m = re.search(r'name="idsrv\.xsrf"[^>]+value="([^"]+)"', html) if not form_action or not xsrf_m: raise RuntimeError("Login form not found") parsed = urlparse(login_url) action_url = f"{parsed.scheme}://{parsed.netloc}{form_action.group(1)}" # Step 2: POST credentials post_data = urlencode({ "username": MONT_EMAIL, "password": MONT_PASS, "idsrv.xsrf": xsrf_m.group(1) }).encode() req2 = Request(action_url, data=post_data, headers={**HEADERS, "Content-Type": "application/x-www-form-urlencoded", "Referer": login_url}, method="POST") with opener.open(req2, timeout=20) as r: html2 = r.read().decode("utf-8", errors="replace") final_url = r.url # Step 3: form_post with id_token back to www.mont.ru form_action2 = re.search(r']+action="([^"]+)"', html2) if form_action2: action2 = form_action2.group(1) hidden = re.findall(r']+type="hidden"[^>]+name="([^"]+)"[^>]+value="([^"]*)"', html2) if not hidden: hidden = re.findall(r']+name="([^"]+)"[^>]+type="hidden"[^>]+value="([^"]*)"', html2) post_data3 = urlencode(dict(hidden)).encode() req3 = Request(action2, data=post_data3, headers={**HEADERS, "Content-Type": "application/x-www-form-urlencoded", "Referer": final_url}, method="POST") with opener.open(req3, timeout=20) as r: r.read() return opener # ── Helpers ─────────────────────────────────────────────────────────────────── def tg_notify(text: str): try: payload = json.dumps({"chat_id": TG_CHAT_ID, "text": text, "parse_mode": "HTML"}).encode() req = Request(f"https://api.telegram.org/bot{TG_TOKEN}/sendMessage", data=payload, headers={"Content-Type": "application/json"}, method="POST") with urlopen(req, timeout=10): pass except Exception as e: print(f" [WARN] Telegram notify failed: {e}") def strip_tags(html): return unescape(re.sub(r"<[^>]+>", "", html)).strip() ALLOWED_TAGS = re.compile( r'<(/?)(' r'p|br|strong|b|em|i|u|s|ul|ol|li|a|h2|h3|h4|h5|blockquote|table|thead|tbody|tr|td|th' r')(\b[^>]*)?>', re.IGNORECASE ) ALLOWED_ATTRS = re.compile(r'\s+(href|target|rel)="([^"]*)"', re.IGNORECASE) DANGEROUS_PROTOCOLS = re.compile(r'^(javascript|vbscript|data):', re.IGNORECASE) def sanitize_html(html_body: str) -> str: """Keep formatting tags (bold, links, lists etc.) but strip everything unsafe.""" # Remove script/style blocks entirely html_body = re.sub(r'<(script|style)[^>]*>.*?', '', html_body, flags=re.IGNORECASE | re.DOTALL) # Remove HTML comments html_body = re.sub(r'', '', html_body, flags=re.DOTALL) result = [] pos = 0 for m in re.finditer(r'<[^>]+>', html_body): # Text before this tag — escape it result.append(unescape(html_body[pos:m.start()])) pos = m.end() tag = m.group(0) tag_m = ALLOWED_TAGS.match(tag) if not tag_m: continue # strip unknown/dangerous tags slash, name, attrs_raw = tag_m.group(1), tag_m.group(2).lower(), tag_m.group(3) or "" if slash: # closing tag result.append(f'') continue # Build safe attribute string safe_attrs = "" if name == "a": href_m = re.search(r'\bhref="([^"]*)"', attrs_raw, re.IGNORECASE) if href_m: href = href_m.group(1) if not DANGEROUS_PROTOCOLS.match(href.strip()): # Make relative mont.ru links absolute if href.startswith("/"): href = "https://www.mont.ru" + href safe_attrs = f' href="{href}" target="_blank" rel="noopener"' if name in ("br",): result.append(f'<{name} />') else: result.append(f'<{name}{safe_attrs}>') result.append(unescape(html_body[pos:])) return "".join(result).strip() def download_image(opener, img_src: str): """Download image from mont.ru, return local relative path or None.""" try: from urllib.parse import quote safe_path = quote(img_src, safe="/:.-_") if img_src.startswith("/") else img_src url = BASE_URL + safe_path if img_src.startswith("/") else safe_path ext = os.path.splitext(img_src.split("?")[0])[1].lower() or ".png" if ext not in (".jpg", ".jpeg", ".png", ".webp", ".gif"): ext = ".png" fname = f"news_{secrets.token_hex(8)}{ext}" path = os.path.join(IMG_DIR, fname) req = Request(url, headers=HEADERS) with opener.open(req, timeout=15) as resp: with open(path, "wb") as f: f.write(resp.read()) return f"news_images/{fname}" except Exception as e: print(f" [WARN] Image download failed: {e}") return None def slug_from(title, slug_id): slug = re.sub(r"[^a-z0-9а-яё]+", "-", title.lower()) slug = re.sub(r"[а-яё]", "", slug) slug = slug.strip("-")[:50] or f"mont-news-{slug_id}" return f"{slug}-{slug_id}" # ── News listing ────────────────────────────────────────────────────────────── def get_news_ids_from_listing(opener) -> tuple[list[str], dict[str, str]]: """Return (list of IDs, dict of id→img_src) from the listing page.""" req = Request(LIST_URL, headers=HEADERS) with opener.open(req, timeout=20) as r: html = r.read().decode("utf-8", errors="replace") # Pair images with the nearest following news link (within 2000 chars) imgs = [(m.start(), m.group(1)) for m in re.finditer(r'src="(/Content/Images/[^"]+)"', html)] links = [(m.start(), m.group(1)) for m in re.finditer(r'href="/ru-ru/news/(\d+)"', html)] id_to_img = {} for img_pos, img_src in imgs: for link_pos, art_id in links: if link_pos > img_pos and link_pos - img_pos < 2000: if art_id not in id_to_img: id_to_img[art_id] = img_src break # Full ordered list of IDs ids = list(dict.fromkeys(art_id for _, art_id in links)) return ids, id_to_img def get_max_slug_id() -> int: """Return the highest mont.ru article ID already in our DB.""" try: conn = sqlite3.connect(DB_PATH, timeout=10) rows = conn.execute("SELECT slug FROM news ORDER BY id DESC LIMIT 50").fetchall() conn.close() ids = [] for (slug,) in rows: m = re.search(r"-(\d{4,})$", slug) if m: ids.append(int(m.group(1))) return max(ids) if ids else 0 except Exception: return 0 def is_already_saved(slug_id: str) -> bool: conn = sqlite3.connect(DB_PATH, timeout=10) row = conn.execute("SELECT id FROM news WHERE slug LIKE ?", (f"%-{slug_id}",)).fetchone() conn.close() return row is not None # ── Fetch & save one article ────────────────────────────────────────────────── def fetch_and_save_article(opener, slug_id: str, listing_img: str = "") -> tuple[bool, str, str]: """ Fetch article from API, save to DB. Returns (saved: bool, title: str, slug: str) """ if is_already_saved(slug_id): print(f" [SKIP] Already exists: {slug_id}") return False, "", "" # Fetch article data via authenticated API api_url = f"{BASE_URL}/ru-ru/apiMvc/news/{slug_id}" req = Request(api_url, headers={**HEADERS, "Accept": "application/json, text/plain, */*"}) try: with opener.open(req, timeout=20) as r: data = json.loads(r.read().decode("utf-8", errors="replace")) except Exception as e: print(f" [WARN] API fetch failed for {slug_id}: {e}") return False, "", "" title = strip_tags(data.get("title", "")).strip() text_html = data.get("text", "") or "" body = sanitize_html(text_html) if not title or len(title) < 5: print(f" [SKIP] No title for {slug_id}") return False, "", "" # Check not a 404 page if "страница не найдена" in title.lower() or "404" in title: print(f" [SKIP] 404 page for {slug_id}") return False, "", "" print(f" [FETCH] {title[:70]}...") # Image: prefer listing image (most reliable), then API fields, then article page img_src = listing_img or data.get("image") or data.get("img") or data.get("previewImage") or "" image_path = None if img_src: image_path = download_image(opener, img_src) if not image_path: # Try scraping the article HTML page for an image try: req2 = Request(f"{BASE_URL}/ru-ru/news/{slug_id}", headers=HEADERS) with opener.open(req2, timeout=15) as r: pg = r.read().decode("utf-8", errors="replace") img_m = re.search(r'src="(/Content/Images/[^"]+)"', pg) if img_m: image_path = download_image(opener, img_m.group(1)) except Exception: pass slug = slug_from(title, slug_id) created_at = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") conn = sqlite3.connect(DB_PATH, timeout=15) try: conn.execute( "INSERT INTO news(title, body, slug, image, published, created_at) VALUES (?,?,?,?,1,?)", (title, body, slug, image_path, created_at) ) conn.commit() print(f" [OK] Published: {title[:70]}") except sqlite3.IntegrityError: slug = f"{slug}-{secrets.token_hex(3)}" conn.execute( "INSERT INTO news(title, body, slug, image, published, created_at) VALUES (?,?,?,?,1,?)", (title, body, slug, image_path, created_at) ) conn.commit() print(f" [OK] Published (alt slug): {title[:70]}") finally: conn.close() time.sleep(0.5) return True, title, slug # ── Main ────────────────────────────────────────────────────────────────────── def main(): all10 = "--all10" in sys.argv print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M}] Logging in to mont.ru...") errors = [] saved_count = 0 published = [] try: opener = make_authenticated_opener() except Exception as e: msg = f"Ошибка авторизации на mont.ru: {e}" print(f"Auth error: {e}") tg_notify(f"🚨 MONT парсер\n{msg}") return try: print("Fetching news listing...") listing_ids, id_to_img = get_news_ids_from_listing(opener) known_max = get_max_slug_id() probe_ids = [str(i) for i in range(known_max + 1, known_max + 6)] all_ids = list(dict.fromkeys(listing_ids + probe_ids)) if all10: candidate_ids = all_ids[:15] else: candidate_ids = [sid for sid in all_ids if not is_already_saved(sid)] if candidate_ids: print(f"Candidates: {candidate_ids}") for sid in candidate_ids: ok, title, slug = fetch_and_save_article(opener, sid, listing_img=id_to_img.get(sid, "")) if ok: saved_count += 1 published.append((title, slug)) if saved_count > 0: _, refreshed_imgs = get_news_ids_from_listing(opener) conn = sqlite3.connect(DB_PATH, timeout=15) for sid in candidate_ids: img_src = refreshed_imgs.get(sid) if img_src: row = conn.execute( "SELECT id, image FROM news WHERE slug LIKE ?", (f"%-{sid}",) ).fetchone() if row and not row[1]: path = download_image(opener, img_src) if path: conn.execute("UPDATE news SET image=? WHERE id=?", (path, row[0])) conn.commit() conn.close() else: print("No new news.") except Exception as e: msg = f"Ошибка парсинга новостей: {e}" print(f"News error: {e}") errors.append(msg) print(f"Done. News saved: {saved_count}") # Hide outdated events hidden_count = hide_outdated_events() if hidden_count: print(f"Hidden outdated events: {hidden_count}") # Scrape events ev_count, ev_published, ev_error = scrape_events(opener) if ev_error: errors.append(ev_error) # Telegram: send only if something new OR errors tg_lines = [] if saved_count > 0: suffix = "ь" if saved_count == 1 else "и" if 2 <= saved_count <= 4 else "ей" tg_lines.append(f"✅ Новости: {saved_count} новост{suffix}:") for title, slug in published: tg_lines.append(f' • {title}') if ev_count > 0: suffix = "е" if ev_count == 1 else "я" if 2 <= ev_count <= 4 else "й" tg_lines.append(f"📅 Мероприятия: {ev_count} мероприяти{suffix}:") for title, slug in ev_published: tg_lines.append(f' • {title}') for err in errors: tg_lines.append(f"🚨 {err}") if tg_lines: tg_notify("\n".join(tg_lines)) # ── Events scraper ──────────────────────────────────────────────────────────── EVENTS_LIST_URL = "https://www.mont.ru/ru-ru/events?eventPeriod=1" EVENTS_IMAGES_DIR = "/home/ruslan/docker/ZKART#/static/events_images" os.makedirs(EVENTS_IMAGES_DIR, exist_ok=True) def parse_event_date(raw: str) -> str | None: """Parse various date formats to YYYY-MM-DD, return None if unparseable.""" if not raw: return None raw = raw.strip() # ISO format m = re.match(r"(\d{4})-(\d{2})-(\d{2})", raw) if m: return f"{m.group(1)}-{m.group(2)}-{m.group(3)}" # DD.MM.YYYY or DD/MM/YYYY m = re.match(r"(\d{1,2})[./](\d{1,2})[./](\d{4})", raw) if m: return f"{m.group(3)}-{m.group(2).zfill(2)}-{m.group(1).zfill(2)}" # D Month YYYY (Russian) months_ru = {"января":"01","февраля":"02","марта":"03","апреля":"04","мая":"05","июня":"06", "июля":"07","августа":"08","сентября":"09","октября":"10","ноября":"11","декабря":"12"} m = re.match(r"(\d{1,2})\s+([а-яё]+)\s+(\d{4})", raw.lower()) if m: mon = months_ru.get(m.group(2)) if mon: return f"{m.group(3)}-{mon}-{m.group(1).zfill(2)}" return None def download_event_image(opener, img_src: str) -> str | None: try: from urllib.parse import quote safe_path = quote(img_src, safe="/:.-_") if img_src.startswith("/") else img_src url = BASE_URL + safe_path if img_src.startswith("/") else safe_path ext = os.path.splitext(img_src.split("?")[0])[1].lower() or ".png" if ext not in (".jpg", ".jpeg", ".png", ".webp", ".gif"): ext = ".png" fname = f"event_{secrets.token_hex(8)}{ext}" path = os.path.join(EVENTS_IMAGES_DIR, fname) req = Request(url, headers=HEADERS) with opener.open(req, timeout=15) as resp: with open(path, "wb") as f: f.write(resp.read()) return f"events_images/{fname}" except Exception as e: print(f" [WARN] Event image download failed: {e}") return None def get_event_ids_from_listing(opener) -> tuple[list[str], dict]: """Use JSON API to get all upcoming events — returns more than the HTML listing.""" import json as _json api_url = "https://www.mont.ru/ru-ru/apiMvc/events?eventPeriod=1&perPageCount=100" req = Request(api_url, headers=HEADERS) with opener.open(req, timeout=20) as r: data = _json.loads(r.read().decode("utf-8", errors="replace")) ids = [] id_to_img = {} id_to_date = {} for ev in data.get("events", []): eid = str(ev.get("eventId", "")) if not eid: continue ids.append(eid) img = ev.get("backgroundImageUrl", "") if img: id_to_img[eid] = img start = ev.get("start", "") if start: id_to_date[eid] = start[:10] # "2026-06-09T10:00:00" → "2026-06-09" return ids, id_to_img, id_to_date def fetch_and_save_event(opener, eid: str, listing_img: str = "", listing_date: str = "") -> tuple[bool, str, str]: from zkart_db_shim import is_event_saved, create_event if is_event_saved(eid): print(f" [SKIP] Event already exists: {eid}") return False, "", "" # Try API first api_url = f"{BASE_URL}/ru-ru/apiMvc/events/{eid}" req = Request(api_url, headers={**HEADERS, "Accept": "application/json, text/plain, */*"}) data = {} try: with opener.open(req, timeout=20) as r: data = json.loads(r.read().decode("utf-8", errors="replace")) except Exception: pass title = strip_tags(data.get("title", "") or data.get("name", "")).strip() body_html = data.get("text", "") or data.get("description", "") or "" body = sanitize_html(body_html) # Fallback: scrape article page if not title: try: req2 = Request(f"{BASE_URL}/ru-ru/events/{eid}", headers=HEADERS) with opener.open(req2, timeout=20) as r: pg = r.read().decode("utf-8", errors="replace") h1 = re.search(r']*>(.*?)', pg, re.DOTALL) if h1: title = strip_tags(h1.group(1)).strip() if not body: content_m = re.search(r']+class="[^"]*content[^"]*"[^>]*>(.*?)', pg, re.DOTALL | re.IGNORECASE) if content_m: body = sanitize_html(content_m.group(1)) # Try to get date from page if not listing_date: dm = re.search(r'(\d{1,2}[./]\d{1,2}[./]\d{4}|\d{1,2}\s+[а-яё]+\s+\d{4})', pg, re.IGNORECASE) if dm: listing_date = parse_event_date(dm.group(1)) or "" except Exception as e: print(f" [WARN] Event page fetch failed: {e}") if not title or len(title) < 4: print(f" [SKIP] No title for event {eid}") return False, "", "" print(f" [FETCH] Event: {title[:70]}...") # Date event_date = listing_date if not event_date: for field in ("date", "startDate", "start_date", "eventDate", "dateStart"): raw = data.get(field, "") if raw: event_date = parse_event_date(str(raw)) or "" if event_date: break if not event_date: event_date = datetime.date.today().strftime("%Y-%m-%d") # Image img_src = listing_img or data.get("image") or data.get("img") or data.get("previewImage") or "" image_path = None if img_src: image_path = download_event_image(opener, img_src) slug_base = slug_from(title, eid) conn = sqlite3.connect(DB_PATH, timeout=15) try: conn.execute( "INSERT INTO events(title, body, slug, image, event_date, published) VALUES (?,?,?,?,?,1)", (title, body, slug_base, image_path, event_date) ) conn.commit() print(f" [OK] Event saved: {title[:60]} ({event_date})") except sqlite3.IntegrityError: slug_base = f"{slug_base}-{secrets.token_hex(3)}" conn.execute( "INSERT INTO events(title, body, slug, image, event_date, published) VALUES (?,?,?,?,?,1)", (title, body, slug_base, image_path, event_date) ) conn.commit() finally: conn.close() time.sleep(0.4) return True, title, slug_base def hide_outdated_events() -> int: """Set published=0 for events where event_date <= today.""" conn = sqlite3.connect(DB_PATH, timeout=10) cur = conn.execute( "UPDATE events SET published=0 WHERE published=1 AND event_date <= date('now','localtime')" ) count = cur.rowcount conn.commit() conn.close() return count def parse_event_page(html: str) -> dict: """Extract body, register_url, image_src from events-details page HTML.""" import re as _re from html import unescape as _u # Description: events-details__about block body = "" about_m = _re.search( r'class="events-details__about[^"]*"[^>]*>.*?]*>(.*?)\s*\s*', html, _re.DOTALL ) if about_m: body = sanitize_html(about_m.group(1)) # Registration URL reg_m = _re.search(r'class="[^"]*register-btn[^"]*"[^>]+href="([^"]+)"', html, _re.IGNORECASE) _raw_reg = reg_m.group(1) if reg_m else "" if _raw_reg.startswith("/"): _raw_reg = "https://www.mont.ru" + _raw_reg register_url = _raw_reg # Cover background image cover_m = _re.search(r'events-details__background[^>]+style="background-image:\s*url\("([^&]+)"\)', html) img_src = cover_m.group(1) if cover_m else "" # Fallback: vendor logo if not img_src: logo_m = _re.search(r'events-details__logo[^>]*>.*?]+src="([^"]+)"', html, _re.DOTALL) if logo_m: img_src = logo_m.group(1) # Fallback: any /Content/Images if not img_src: ci_m = _re.search(r'src="(/Content/Images/[^"]+)"', html) if ci_m: img_src = ci_m.group(1) # Date from events-details__dates date_m = _re.search(r'events-details__dates[^>]*>.*?(\d{1,2}\.\d{2}\.\d{4})', html, _re.DOTALL) date_str = parse_event_date(date_m.group(1)) if date_m else "" return {"body": body, "register_url": register_url, "img_src": img_src, "date_str": date_str} def scrape_events(opener=None): print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M}] Scraping events...") def is_event_saved(eid): conn = sqlite3.connect(DB_PATH, timeout=10) row = conn.execute("SELECT id FROM events WHERE slug LIKE ?", (f"%-{eid}",)).fetchone() conn.close() return row is not None def save_event(title, body, slug, image_path, event_date, register_url): conn = sqlite3.connect(DB_PATH, timeout=15) try: conn.execute( "INSERT INTO events(title, body, slug, image, event_date, published, register_url) VALUES (?,?,?,?,?,1,?)", (title, body, slug, image_path, event_date, register_url) ) conn.commit() return slug except sqlite3.IntegrityError: s2 = f"{slug}-{secrets.token_hex(3)}" conn.execute( "INSERT INTO events(title, body, slug, image, event_date, published, register_url) VALUES (?,?,?,?,?,1,?)", (title, body, s2, image_path, event_date, register_url) ) conn.commit() return s2 finally: conn.close() if opener is None: try: opener = make_authenticated_opener() except Exception as e: msg = f"Ошибка авторизации (events): {e}" print(f" Auth error: {e}") return 0, [], msg try: ids, id_to_img, id_to_date = get_event_ids_from_listing(opener) except Exception as e: msg = f"Ошибка листинга мероприятий: {e}" print(f" Listing error: {e}") return 0, [], msg candidates = [eid for eid in ids if not is_event_saved(eid)] if not candidates: print(" No new events.") return 0, [], None print(f" Event candidates: {candidates}") saved_count = 0 published = [] for eid in candidates: # Fetch full event page HTML (contains all data) try: req = Request(f"{BASE_URL}/ru-ru/events/{eid}", headers=HEADERS) with opener.open(req, timeout=20) as r: pg = r.read().decode("utf-8", errors="replace") except Exception as e: print(f" [WARN] Could not fetch event page {eid}: {e}") continue parsed = parse_event_page(pg) body = parsed["body"] register_url = parsed["register_url"] img_src = parsed["img_src"] or id_to_img.get(eid, "") event_date = parsed["date_str"] or id_to_date.get(eid, "") # Title from h1 h1_m = re.search(r']*>(.*?)', pg, re.DOTALL) title = strip_tags(h1_m.group(1)).strip() if h1_m else "" if not title: title = strip_tags(re.search(r'events-details__title[^>]*>(.*?)]+>', pg, re.DOTALL).group(1)).strip() if re.search(r'events-details__title[^>]*>(.*?)]+>', pg, re.DOTALL) else "" if not title or len(title) < 4: print(f" [SKIP] No title for event {eid}") continue if "страница не найдена" in title.lower() or "404" in title: print(f" [SKIP] 404 for event {eid}") continue if not body: body = title # at minimum use title as body if not event_date: event_date = datetime.date.today().strftime("%Y-%m-%d") # Download image image_path = None if img_src: image_path = download_event_image(opener, img_src) slug = slug_from(title, eid) final_slug = save_event(title, body, slug, image_path, event_date, register_url) print(f" [OK] Event: {title[:60]} ({event_date}){' +reg' if register_url else ''}") saved_count += 1 published.append((title, final_slug)) time.sleep(0.4) return saved_count, published, None if __name__ == "__main__": main()