Рабочая версия 11 октября комплект IM_settitg JS, Server, Scraper, Generator 50 AI Консультант — Ввод данных
🌞
Это server 5003 from flask import Flask, request, jsonify from IM_site_scraper10 import scrape_site_urls from flask_cors import CORS import os import json from urllib.parse import urlparse app = Flask(__name__) CORS(app) # Храним активные генераторы по доменам generators = {} @app.route(“/submit-site”, methods=[“POST”]) def submit_site(): data = request.get_json() site = data.get(“site”, “”).strip() max_pages = int(data.get(“max_pages”, 50)) if not site.startswith(“http”): site = “https://” + site try: parsed_url = urlparse(site) domain = parsed_url.netloc.replace(“.”, “_”) # ? Удаляем все старые файлы для чистоты import glob for pattern in [f”IM_{domain}.json”, f”IM_{domain}_texts_*.json”, f”IM_DNA_{domain}.json”]: for file in glob.glob(pattern): try: os.remove(file) except Exception: pass # создаём новый генератор для сайта gen = scrape_site_urls(site, max_pages=max_pages) generators[domain] = gen # ? Формируем новый файл со списком страниц сайта (пустой пока) output_file = f”IM_{domain}.json” with open(output_file, “w”, encoding=”utf-8″) as f: import json json.dump([], f, ensure_ascii=False, indent=2) return jsonify({“status”: “ok”, “message”: “Генератор создан”, “domain”: domain}) except Exception as e: return jsonify({“status”: “error”, “message”: str(e)}), 500 @app.route(“/next-page”, methods=[“POST”]) def next_page(): data = request.get_json() site = data.get(“site”, “”).strip() if not site.startswith(“http”): site = “https://” + site parsed_url = urlparse(site) domain = parsed_url.netloc.replace(“.”, “_”) gen = generators.get(domain) if not gen: return jsonify({“status”: “error”, “message”: “Генератор не найден”}), 400 try: page = next(gen) return jsonify({“status”: “ok”, “page”: page}) except StopIteration: generators.pop(domain, None) return jsonify({“status”: “done”, “message”: “Страницы закончились”}) @app.route(“/submit-pages”, methods=[“POST”]) def submit_pages(): data = request.get_json() site = data.get(“site”, “”).strip() selected_urls = data.get(“selectedUrls”, []) extra_urls = data.get(“extraUrls”, []) if len(extra_urls) > 5: return jsonify({“status”: “error”, “message”: “Too many extra URLs”}), 400 if not site.startswith(“http”): site = “https://” + site if not site: return jsonify({“status”: “error”, “message”: “Site is required”}), 400 parsed_url = urlparse(site) domain = parsed_url.netloc.replace(“.”, “_”) output_file = f”IM_{domain}.json” try: # ? Считываем уже существующие выбранные страницы existing_selected = [] if os.path.exists(output_file): with open(output_file, “r”, encoding=”utf-8″) as f: existing_selected = json.load(f) # ? Если пришли новые выбранные страницы, используем их; иначе оставляем старые current_selected = selected_urls if selected_urls else existing_selected # ? Объединяем с дополнительными страницами combined_urls = list(set(current_selected + extra_urls)) # ? Сохраняем окончательный список with open(output_file, “w”, encoding=”utf-8″) as f: json.dump(combined_urls, f, ensure_ascii=False, indent=2) # ? Запускаем process_urls на обновлённом списке from IM_generator_test_50 import process_urls process_urls(output_file, domain) return jsonify({“status”: “ok”, “message”: f”{len(combined_urls)} URLs saved.”}) except Exception as e: return jsonify({“status”: “error”, “message”: str(e)}), 500 @app.route(“/submit-userinfo”, methods=[“POST”]) def save_user_info(): data = request.get_json() if not data: return jsonify({“status”: “error”, “message”: “No JSON received”}), 400 # Берём site из data или создаём fallback site = data.get(“site”, “”).strip() if not site: return jsonify({“status”: “error”, “message”: “Site is required”}), 400 if not site.startswith(“http”): site = “https://” + site parsed_url = urlparse(site) domain = parsed_url.netloc.replace(“.”, “_”) output_file = f”IM_{domain}_info.json” try: # ? Сохраняем весь JSON как есть with open(output_file, “w”, encoding=”utf-8″) as f: json.dump(data, f, ensure_ascii=False, indent=2) return jsonify({“status”: “ok”, “message”: f”User info saved to {output_file}”}) except Exception as e: return jsonify({“status”: “error”, “message”: str(e)}), 500 @app.route(“/text”, methods=[“POST”]) def save_text(): data = request.get_json() site = data.get(“site”, “”).strip() text = data.get(“text”, “”).strip() if not site.startswith(“http”): site = “https://” + site if not site or not text: return jsonify({“status”: “error”, “message”: “Site and text are required”}), 400 parsed_url = urlparse(site) domain = parsed_url.netloc.replace(“.”, “_”) folder = “.” max_files = 5 existing_files = [ os.path.join(folder, f”IM_{domain}_texts_{i}.json”) for i in range(1, max_files + 1) if os.path.isfile(os.path.join(folder, f”IM_{domain}_texts_{i}.json”)) ] if len(existing_files) >= max_files: target_file = min(existing_files, key=os.path.getctime) else: for i in range(1, max_files + 1): candidate = os.path.join(folder, f”IM_{domain}_texts_{i}.json”) if candidate not in existing_files: target_file = candidate break with open(target_file, “w”, encoding=”utf-8″) as f: json.dump({“text”: text}, f, ensure_ascii=False, indent=2) from IM_generator_test_50 import process_urls process_urls(f”IM_{domain}.json”, domain) return jsonify({“status”: “ok”, “message”: f”Text saved to {os.path.basename(target_file)}”}) if __name__ == “__main__”: app.run(host=”0.0.0.0″, port=5003, debug=True) ЭТО SCRAPER функция для Server 5003 # site_scraper.py import requests from bs4 import BeautifulSoup from urllib.parse import urljoin, urlparse, urldefrag from collections import deque import json import os def is_static_url(url): blacklist_params = [ ‘add-to-cart’, ‘cart’, ‘checkout’, ‘order’, ‘session’, ‘payment’, ‘login’, ‘register’, ‘signup’, ‘profile’, ‘user/settings’, ‘subscribe’, ‘unsubscribe’, ‘track’, ‘wishlist’ ] parsed = urlparse(url) if parsed.query: for param in blacklist_params: if param in parsed.query.lower(): return False path_lower = parsed.path.lower() for param in blacklist_params: if param in path_lower: return False return True def is_valid_url(url, root_url): excluded_exts = { “.jpg”, “.jpeg”, “.png”, “.gif”, “.svg”, “.webp”, “.zip”, “.rar”, “.7z”, “.tar”, “.gz”, “.mp4”, “.avi”, “.mov”, “.mp3”, “.wav”, “.webm”, “.mkv”, “.exe”, “.msi”, “.apk”, “.bat”, “.sh”, “.css”, “.js”, “.ts”, “.php”, “.asp”, “.aspx”, “.dll”, “.bin” } excluded_files = {“robots.txt”, “sitemap.xml”} parsed = urlparse(url) parsed_root = urlparse(root_url) if parsed.scheme not in [“http”, “https”]: return False if parsed.netloc != parsed_root.netloc: return False if not url.startswith(root_url): return False excluded_domains = ( ‘fonts.googleapis.com’, ‘cdn.’, ‘cloudfront.net’, ‘analytics.’, ‘api.’ ) if any(domain in parsed.netloc.lower() for domain in excluded_domains): return False excluded_prefixes = (‘#’, ‘javascript:void(0)’, ‘mailto:’, ‘tel:’) url_lower = url.lower() if url_lower.startswith(excluded_prefixes): return False if parsed.path.split(‘/’)[-1].lower() in excluded_files: return False path = parsed.path.lower() ext = ‘.’ + path.split(‘.’)[-1] if ‘.’ in path else ” if ext in excluded_exts: return False if not is_static_url(url): return False return True def scrape_site_urls(site_url, max_pages=50): visited = set() to_visit = deque([site_url]) collected = [] parsed_root = urlparse(site_url) domain_safe = parsed_root.netloc.replace(‘.’, ‘_’) output_file = f”IM_{domain_safe}.json” while to_visit and len(collected) < max_pages: current_url = to_visit.popleft() # ? Удаляем фрагмент (#section и т.п.) current_url, _ = urldefrag(current_url) if current_url in visited: continue visited.add(current_url) print(f"[TRY] {current_url}") try: response = requests.get( current_url, timeout=(3, 5), headers={"User-Agent": "Mozilla/5.0"} ) content_type = response.headers.get("Content-Type", "").lower() if "text/html" not in content_type and "application/pdf" not in content_type: continue collected.append(current_url) try: with open(output_file, "w", encoding="utf-8") as f: json.dump(collected, f, ensure_ascii=False, indent=2) except Exception as e: print(f"[ERROR] Saving JSON: {e}") if "text/html" in content_type: soup = BeautifulSoup(response.text, "html.parser") for link_tag in soup.find_all("a", href=True): href = link_tag['href'] full_url = urljoin(current_url, href) # ? Удаляем фрагмент и перед добавлением full_url, _ = urldefrag(full_url) if is_valid_url(full_url, site_url) and full_url not in visited: to_visit.append(full_url) yield current_url except requests.RequestException as e: print(f"[ERROR] {current_url}: {e}") continue try: with open(output_file, "w", encoding="utf-8") as f: json.dump(collected, f, ensure_ascii=False, indent=2) except Exception as e: print(f"[ERROR] Final JSON save: {e}") return collected ЭТО GENERATOR 50 для SERVER 5003 функция создания файла с ключевыми словами import json import requests from bs4 import BeautifulSoup, Comment from PyPDF2 import PdfReader import tempfile import os import openai import tiktoken import glob openai.api_key = "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" ENCODING = tiktoken.get_encoding("cl100k_base") MAX_TOKENS_PER_DOC = 3000 # Max tokens per document text MAX_RESPONSE_TOKENS = 500 # Max tokens for the AI response def truncate_text_by_tokens(text: str, max_tokens: int) -> str: tokens = ENCODING.encode(text) if len(tokens) > max_tokens: tokens = tokens[:max_tokens] return ENCODING.decode(tokens) return text def extract_text_from_url(url): try: resp = requests.get(url) content_type = resp.headers.get(“Content-Type”, “”) if “application/pdf” in content_type or url.lower().endswith(“.pdf”): with tempfile.NamedTemporaryFile(delete=False, suffix=”.pdf”) as tmp: tmp.write(resp.content) tmp_path = tmp.name reader = PdfReader(tmp_path) text = “” for page in reader.pages: page_text = page.extract_text() if page_text: text += page_text + “\n” os.remove(tmp_path) return text elif “text/html” in content_type: soup = BeautifulSoup(resp.text, “html.parser”) for tag in soup([“script”, “style”, “noscript”, “header”, “footer”, “nav”, “form”]): tag.decompose() for comment in soup.find_all(string=lambda text: isinstance(text, Comment)): comment.extract() raw_text = soup.get_text(separator=” “, strip=True) clean_text = ” “.join(raw_text.split()) return clean_text else: return resp.text except Exception: return “” def clean_text(text): words = text.split() filtered = [w for w in words if len(w) > 3] return ” “.join(filtered) def get_top_keywords(text, top_n=40): # Обрезаем текст, чтобы не превышать лимит токенов truncated_text = truncate_text_by_tokens(text, MAX_TOKENS_PER_DOC) prompt = ( “Respond only in English. ” “Analyze the provided text and identify 40 distinctive keywords. ” “For each word, if possible, add a synonym (for example, ‘price — cost’). ” “Place the main title on the top line. ” “Place the product name, characteristic terms, company names, and other proper nouns at the top of the list. ” “If a word is significant or appears frequently, also move it closer to the top. ” “Do not repeat selected keywords. ” “After placing the main title, product name, and unique terms, arrange the remaining keywords in descending order of importance and uniqueness. ” “Section or chapter titles are important keywords; keep their original titles and place them at the top of the list. ” “Present the result as a simple numbered list in English only. ” “Here is the text:\n\n” + truncated_text ) response = openai.chat.completions.create( model=”gpt-4o-mini”, messages=[{“role”: “user”, “content”: prompt}], max_tokens=MAX_RESPONSE_TOKENS, temperature=0.3, ) keywords_text = response.choices[0].message.content.strip() # — ВСТАВЛЕННЫЙ БЛОК — # print(“\n— Ключевые слова (исходные от ChatGPT) —“) # print(keywords_text) # print(“——————————————–\n”) # ——————————————– keywords = [] for line in keywords_text.split(“\n”): line = line.strip() if line: kw = line.split(“.”, 1)[-1].strip() if “.” in line[:3] else line keywords.append(kw) if len(keywords) > top_n: keywords = keywords[:top_n] return keywords def process_urls(json_filename, site_name, max_words=40): with open(json_filename, “r”, encoding=”utf-8″) as f: urls = json.load(f) output = {} def split_lines_unique(keywords_list, max_words): seen = set() total_words = 0 result = [] for line in keywords_list: words = line.replace(‘—’, ”).split() # убираем спецсимволы типа — line_words = [] for w in words: lw = w.lower() if lw not in seen: line_words.append(w) seen.add(lw) total_words += 1 if total_words >= max_words: break if line_words: result.append(“”, ””.join(line_words)) if total_words >= max_words: break return result # — Обработка URL — for url in urls: raw_text = extract_text_from_url(url) clean = clean_text(raw_text) keywords = get_top_keywords(clean) output[url] = split_lines_unique(keywords, max_words) print(f”Обработана секция: {url}”) # — Обработка локальных файлов — domain = site_name for i in range(1, 6): filename = f”IM_{domain}_texts_{i}.json” if os.path.isfile(filename): try: with open(filename, “r”, encoding=”utf-8″) as f_local: data = json.load(f_local) text = data.get(“text”, “”) if text: clean = clean_text(text) keywords = get_top_keywords(clean) output[f”file: {filename}”] = split_lines_unique(keywords, max_words) print(f”Обработана секция: file: {filename}”) except Exception: pass # — Сохранение итогового DNA-файла — output_filename = f”IM_DNA_{site_name}.json” with open(output_filename, “w”, encoding=”utf-8″) as f: json.dump(output, f, ensure_ascii=False, indent=2) return output_filename #if __name__ == “__main__”: # json_file = “IM_division_business.json” #site_name = “division_business” #output_file = process_urls(json_file, site_name) #print(f”Ключевые слова записаны в файл: {output_file}”)