From aed9d240668c5cf205c32b651951e2b32201926f Mon Sep 17 00:00:00 2001 From: LHY0125 <3364451258@qq.com> Date: Tue, 3 Feb 2026 17:30:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .trae/rules/anconda.md | 2 + README.md | 46 +++++++ requirements.txt | 3 + 爬取抖音视频.py | 301 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 352 insertions(+) create mode 100644 .trae/rules/anconda.md create mode 100644 README.md create mode 100644 requirements.txt create mode 100644 爬取抖音视频.py diff --git a/.trae/rules/anconda.md b/.trae/rules/anconda.md new file mode 100644 index 0000000..c6b8a70 --- /dev/null +++ b/.trae/rules/anconda.md @@ -0,0 +1,2 @@ +本文件夹下的规则是所有规则的基础,所有规则都必须继承自本文件夹下的规则。 +Python的环境为base(D:\ProgramData\anaconda3) \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..0bb9bd3 --- /dev/null +++ b/README.md @@ -0,0 +1,46 @@ +# 抖音批量下载工具 (GUI版) + +## 简介 +这是一个基于 Python 的抖音视频/图文批量下载工具。带有图形用户界面 (GUI),操作简单。它使用 `DrissionPage` 来自动化控制 Edge 浏览器获取数据,并支持下载视频和图文作品。 + +> **项目介绍视频**:[点击观看](https://www.douyin.com/user/self?modal_id=7602291788182721777&showTab=record) + +## 功能特点 +- **图形界面**:操作直观,无需命令行。 +- **批量下载**:支持指定下载数量。 +- **多类型支持**:自动识别并下载视频文件 (.mp4) 和图文图集 (文件夹保存)。 +- **自动命名**:文件按发布日期 `YYYY_MM_DD` 命名,同一天发布的多个作品会自动添加序号。 +- **实时日志**:界面显示运行日志,实时掌握下载进度。 + +## 环境要求 +- Windows 操作系统 +- Python 3.x +- **Microsoft Edge 浏览器** (必需,默认路径为 `C:\Program Files (x86)\Microsoft\Edge\Application\msedge.exe`) + +## 依赖库 +请确保安装以下 Python 库: +```bash +pip install requests DrissionPage +``` +*(tkinter 通常随 Python 安装自带)* + +## 使用说明 +1. **运行程序**: + 在终端中运行脚本: + ```bash + python 爬取抖音视频.py + ``` +2. **填写信息**: + - **作者主页链接**:粘贴你要下载的抖音博主主页链接。 + - **爬取视频个数**:输入要下载的作品数量(按发布时间倒序获取,下载时按正序保存)。 + - **保存路径**:点击“选择文件夹”选择保存位置。 +3. **开始下载**: + 点击“开始下载”按钮。程序将自动启动 Edge 浏览器并开始扫描。 + > **注意**:程序运行时会弹出 Edge 浏览器窗口,请**不要关闭**它,程序需要通过它来获取数据。 +4. **完成**: + 下载完成后会有弹窗提示,并可在日志窗口看到详细结果。 + +## 注意事项 +- 本工具依赖本地 Edge 浏览器,请确保路径正确。如果你的 Edge 安装在其他位置,请修改代码中的 `DEFAULT_BROWSER_PATH` 变量。 +- 爬取过程中请保持网络畅通。 +- 本工具仅供学习交流使用,请勿用于非法用途。 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..3d4b83a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +要求的库: +requests +DrissionPage diff --git a/爬取抖音视频.py b/爬取抖音视频.py new file mode 100644 index 0000000..1fe9f85 --- /dev/null +++ b/爬取抖音视频.py @@ -0,0 +1,301 @@ +import os +import time +import requests +import threading +import tkinter as tk +from tkinter import filedialog, messagebox, scrolledtext +from datetime import datetime +from DrissionPage import ChromiumPage, ChromiumOptions + +# python "D:\Code\doing_exercises\programs\crawl_tiktok_video\爬取抖音视频.py" + +# ================= 配置区域 ================= +# 这里写死了 Edge 的路径 +DEFAULT_BROWSER_PATH = r"C:\Program Files (x86)\Microsoft\Edge\Application\msedge.exe" + + +# =========================================== + + +class DouyinDownloaderApp: + def __init__(self, root): + self.root = root + self.root.title("抖音批量下载工具 (GUI版)") + self.root.geometry("600x550") + + # 界面布局变量 + self.url_var = tk.StringVar() + self.count_var = tk.StringVar(value="10") + self.save_path_var = tk.StringVar() + self.is_running = False + + self.create_widgets() + + def create_widgets(self): + # 1. 主页链接 + tk.Label(self.root, text="1. 作者主页链接:").pack( + anchor="w", padx=10, pady=(10, 0) + ) + entry_url = tk.Entry(self.root, textvariable=self.url_var, width=60) + entry_url.pack(padx=10, pady=5, fill="x") + + # 2. 爬取数量 + tk.Label(self.root, text="2. 爬取视频个数:").pack( + anchor="w", padx=10, pady=(10, 0) + ) + entry_count = tk.Entry(self.root, textvariable=self.count_var, width=60) + entry_count.pack(padx=10, pady=5, fill="x") + + # 3. 保存路径 + tk.Label(self.root, text="3. 保存路径:").pack(anchor="w", padx=10, pady=(10, 0)) + frame_path = tk.Frame(self.root) + frame_path.pack(padx=10, pady=5, fill="x") + + # === 修正点:readOnly=True 改为 state='readonly' === + entry_path = tk.Entry( + frame_path, textvariable=self.save_path_var, state="readonly" + ) + entry_path.pack(side="left", fill="x", expand=True) + + btn_browse = tk.Button( + frame_path, text="选择文件夹", command=self.select_folder + ) + btn_browse.pack(side="right", padx=(5, 0)) + + # 4. 开始按钮 + self.btn_start = tk.Button( + self.root, + text="开始下载", + command=self.start_thread, + bg="#4CAF50", + fg="white", + font=("Arial", 12, "bold"), + ) + self.btn_start.pack(pady=15, fill="x", padx=50) + + # 5. 日志输出窗口 + tk.Label(self.root, text="运行日志:").pack(anchor="w", padx=10) + self.log_text = scrolledtext.ScrolledText( + self.root, height=15, state="disabled" + ) + self.log_text.pack(padx=10, pady=5, fill="both", expand=True) + + def log(self, message): + """向日志窗口输出信息""" + self.log_text.config(state="normal") + self.log_text.insert(tk.END, message + "\n") + self.log_text.see(tk.END) # 滚动到底部 + self.log_text.config(state="disabled") + + def select_folder(self): + """选择文件夹对话框""" + folder_selected = filedialog.askdirectory() + if folder_selected: + self.save_path_var.set(folder_selected) + + def start_thread(self): + """在独立线程中运行,防止界面卡死""" + if self.is_running: + messagebox.showwarning("提示", "任务正在进行中,请稍候...") + return + + # 验证输入 + url = self.url_var.get().strip() + count_str = self.count_var.get().strip() + save_path = self.save_path_var.get().strip() + + if not url: + messagebox.showerror("错误", "请输入主页链接") + return + if not count_str.isdigit() or int(count_str) <= 0: + messagebox.showerror("错误", "请输入正确的数量") + return + if not save_path: + messagebox.showerror("错误", "请选择保存路径") + return + + # 检查浏览器路径是否存在 + if not os.path.exists(DEFAULT_BROWSER_PATH): + messagebox.showerror( + "错误", + f"未找到浏览器文件:\n{DEFAULT_BROWSER_PATH}\n\n请确认已安装Edge或修改代码中的路径配置。", + ) + return + + self.is_running = True + self.btn_start.config(state="disabled", text="正在运行...") + self.log_text.config(state="normal") + self.log_text.delete(1.0, tk.END) # 清空日志 + self.log_text.config(state="disabled") + + # 开启线程 + thread = threading.Thread( + target=self.run_task, args=(url, int(count_str), save_path) + ) + thread.daemon = True + thread.start() + + def download_file(self, url, filepath): + """下载文件逻辑""" + try: + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Referer": "https://www.douyin.com/", + } + for _ in range(3): + try: + response = requests.get( + url, headers=headers, stream=True, timeout=20 + ) + if response.status_code == 200: + with open(filepath, "wb") as f: + for chunk in response.iter_content(chunk_size=1024 * 1024): + f.write(chunk) + return True + break + except requests.exceptions.RequestException: + time.sleep(1) + continue + return False + except Exception as e: + self.log(f"下载出错: {e}") + return False + + def run_task(self, target_url, target_count, save_root): + """核心业务逻辑""" + dp = None + try: + self.log(f"正在启动 Edge 浏览器 ({DEFAULT_BROWSER_PATH})...") + + co = ChromiumOptions() + co.set_paths(browser_path=DEFAULT_BROWSER_PATH) + + # 尝试启动浏览器 + dp = ChromiumPage(addr_or_opts=co) + + # 开始监听 + dp.listen.start("aweme/v1/web/aweme/post") + + self.log(f"正在访问: {target_url}") + dp.get(target_url) + + collected_works = [] + self.log("正在扫描作品列表 (请不要关闭弹出的浏览器)...") + + no_new_data_count = 0 + + while len(collected_works) < target_count: + dp.scroll.to_bottom() + + # 等待数据包 + res = dp.listen.wait(timeout=2) + + found_new = False + if res: + try: + data = res.response.body + if data and "aweme_list" in data: + aweme_list = data["aweme_list"] + if aweme_list: + for aweme in aweme_list: + if not any( + w["aweme_id"] == aweme["aweme_id"] + for w in collected_works + ): + collected_works.append(aweme) + found_new = True + except: + pass + + self.log(f"已获取作品信息: {len(collected_works)}/{target_count}") + + if len(collected_works) >= target_count: + break + + if not found_new: + no_new_data_count += 1 + time.sleep(1) + else: + no_new_data_count = 0 + + if no_new_data_count > 8: + self.log("未检测到新数据,可能已到底部。") + break + + self.log(f"扫描完成,共获取 {len(collected_works)} 个作品。") + dp.close() # 关闭浏览器 + + # 处理数据 + works_to_process = collected_works[:target_count] + # 按时间正序 + works_to_process.sort(key=lambda x: x["create_time"]) + + self.log("开始下载...") + date_counter = {} + + for index, work in enumerate(works_to_process): + try: + ts = work["create_time"] + date_str = datetime.fromtimestamp(ts).strftime("%Y_%m_%d") + + if date_str not in date_counter: + date_counter[date_str] = 1 + file_name_base = date_str + else: + date_counter[date_str] += 1 + count_idx = date_counter[date_str] + file_name_base = f"{date_str}({count_idx})" + + is_video = True + if "images" in work and work["images"]: + is_video = False + + self.log( + f"[{index + 1}/{len(works_to_process)}] {file_name_base} | {'视频' if is_video else '图文'}" + ) + + if is_video: + video_url = work["video"]["play_addr"]["url_list"][0] + file_path = os.path.join(save_root, f"{file_name_base}.mp4") + if not os.path.exists(file_path): + self.download_file(video_url, file_path) + else: + self.log(" -> 文件已存在,跳过") + else: + img_folder = os.path.join(save_root, file_name_base) + if not os.path.exists(img_folder): + os.makedirs(img_folder) + + images = work["images"] + for idx, img_obj in enumerate(images): + img_url = img_obj["url_list"][0] + img_name = f"{idx + 1}.png" + img_path = os.path.join(img_folder, img_name) + if not os.path.exists(img_path): + self.download_file(img_url, img_path) + self.log(" -> 图文下载完成") + + except Exception as e: + self.log(f" -> 处理出错: {e}") + continue + + self.log("=" * 30) + self.log("全部任务结束!") + messagebox.showinfo("完成", "全部下载任务已结束!") + + except Exception as e: + self.log(f"发生严重错误: {e}") + if dp: + try: + dp.close() + except: + pass + finally: + self.is_running = False + self.btn_start.config(state="normal", text="开始下载") + + +if __name__ == "__main__": + root = tk.Tk() + app = DouyinDownloaderApp(root) + root.mainloop()