1
1
mirror of https://github.com/neosubhamoy/pytubepp.git synced 2026-02-04 18:22:23 +05:30

64 Commits

12 changed files with 1044 additions and 500 deletions

View File

@@ -18,15 +18,14 @@ jobs:
- name: 🐍 Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.10'
python-version: '3.12'
- name: 📦 Install dependencies
run: |
pip install -r requirements.txt
pip install wheel twine
- name: 🛠️ Build package
run: python3 setup.py sdist bdist_wheel
run: python3 -m build
- name: 🚀 Publish package distributions to PyPI
uses: pypa/gh-action-pypi-publish@release/v1

3
.gitignore vendored
View File

@@ -1,4 +1,5 @@
# Compiled python modules.
# Compiled python modules and cache.
__pycache__/
*.pyc
# Setuptools distribution folder.

View File

124
README.md
View File

@@ -1,23 +1,33 @@
# pytubePP - (Pytube Post Processor)
# PytubePP - (Pytube Post Processor)
### A Simple CLI Tool to Download Your Favourite YouTube Videos Effortlessly!
[![status](https://img.shields.io/badge/status-active-brightgreen.svg?style=flat)](https://github.com/neosubhamoy/pytubepp/)
[![verion](https://img.shields.io/badge/version-v1.0.1_stable-yellow.svg?style=flat)](https://github.com/neosubhamoy/pytubepp/)
[![python](https://img.shields.io/badge/python-v3.12.x-blue?logo=python&style=flat)](https://www.python.org/downloads/)
[![PypiDownloads](https://img.shields.io/pypi/dm/pytubepp?color=brightgreen)](https://pypi.org/project/pytubepp/)
[![PypiVersion](https://img.shields.io/pypi/v/pytubepp?color=yellow)](https://pypi.org/project/pytubepp/)
[![python](https://img.shields.io/badge/python-v3.13-blue?logo=python&style=flat)](https://www.python.org/downloads/)
[![builds](https://img.shields.io/badge/builds-passing-brightgreen.svg?style=flat)](https://github.com/neosubhamoy/pytubepp/)
[![PRs](https://img.shields.io/badge/PRs-welcome-blue.svg?style=flat)](https://github.com/neosubhamoy/pytubepp/)
😀 GOOD NEWS: If you are not a power user and don't want to bother remembering PytubePP Commands! (You are not familier with Command Line Tools). We recently released a Browser Extension that can auto detect YouTube Videos and You can download the Video in one click directly from the browser using PytubePP CLI. Install [PytubePP Helper](https://github.com/neosubhamoy/pytubepp-helper) app in your System and add [PytubePP Extension](https://github.com/neosubhamoy/pytubepp-extension) in your Browser to get started.
> **🥰 Liked this project? Please consider giving it a Star (🌟) on github to show us your appreciation and help the algorythm recommend this project to even more awesome people like you!**
### **🏷️ Features**
* Auto Post-Process & Merge YouTube DASH Streams
* Supports upto 8K 60fps HDR Stream Download
* Supports MP3 Download (with Embeded Thumbnail and Tags)
* Supports Embeded Captions
* Smart Stream Selection
* Highly Configurable and Many More 😉
### **🧩 Dependencies**
* [pytube](https://pypi.org/project/pytube/)
* [FFmpeg (Not Pre-Included)](https://ffmpeg.org/)
### **📎 Pre-Requirements**
* [Python](https://www.python.org/downloads/) (>=3.8)
* [FFmpeg](https://ffmpeg.org/)
* [Node.js](https://nodejs.org/en/download/) (required for auto YT poToken genration which is currently not possible in Python environment)
### **🧩 Python Dependencies**
* [pytubefix](https://pypi.org/project/pytubefix/)
* [ffmpy](https://pypi.org/project/ffmpy/)
* [mutagen](https://pypi.org/project/mutagen/)
* [tabulate](https://pypi.org/project/tabulate/)
@@ -27,56 +37,124 @@
* [setuptools](https://pypi.org/project/setuptools/)
### **🛠️ Installation**
You can install pytubePP in your system via PIP by simply running the below command
1. Install Python and PIP
- Linux (Debian): Python is pre-installed install PIP using `sudo apt install python3-pip`<br>
- Linux (Fedora): Python is pre-installed install PIP using `sudo dnf install python3-pip`<br>
- Linux (Arch): Python is pre-installed install PIP using `sudo pacman -S python-pip`<br>
- Windows (10/11): `winget install Python.Python.3.13`<br>
- MacOS (using Homebrew): `brew install python`<br>
- Android (using Termux): `pkg install python`
2. Install FFmpeg
- Linux (Debian): `sudo apt install ffmpeg`<br>
- Linux (Fedora): `sudo dnf install ffmpeg-free`<br>
- Linux (Arch): `sudo pacman -S ffmpeg`<br>
- Windows (10/11): `winget install ffmpeg`<br>
- MacOS (using Homebrew): `brew install ffmpeg`<br>
- Android (using Termux): `pkg install ffmpeg`
3. Install Node.js
- Linux (Debian): `curl -o- https://fnm.vercel.app/install | bash && fnm install --lts && fnm use lts`<br>
- Linux (Fedora): `curl -o- https://fnm.vercel.app/install | bash && fnm install --lts && fnm use lts`<br>
- Linux (Arch): `curl -o- https://fnm.vercel.app/install | bash && fnm install --lts && fnm use lts`<br>
- Windows (10/11): `winget install Schniz.fnm;fnm install --lts;fnm use lts`<br>
- MacOS (using Homebrew): `brew install node`<br>
- Android (using Termux): `pkg install nodejs`
4. Install PytubePP (using PIP)
> Use `pip3` command instead of `pip` if you are on Linux or MacOS.
> Use `--break-system-packages` flag to install 'PytubePP' in global environment if you get `error: externally-managed-environment` while installing in Linux or MacOS (Don't worry it will not break your system packages, it's just a security mesure)
```terminal
pip install pytubepp
```
**IMPORTANT: Before installing pytubePP make sure FFmpeg is installed in your system and accesable via your cli interface (FFmpeg is Must Required as some of the core features of pytubePP relies on FFmpeg, but due to security reasons we can not ship it with the module)**
**>> Install FFmpeg (If you haven't already!)**
**NOTE: Always make sure 'PytubePP' and 'Pytubefix' is on the latest version to avoid issues (update them at least once a week) (Use the command below to update)**
Linux (Ubuntu): `apt install ffmpeg`<br>
Windows (using Chocolatey): `choco install ffmpeg`<br>
MacOS (using Homebrew): `brew install ffmpeg`<br>
Android (using Termux): `pkg install ffmpeg`
```
pip install pytubefix pytubepp --upgrade
```
**UNINSTALL: If you want to uninstall PytubePP (Use the command below to uninstall) NOTE: it will only remove the 'PytubePP' python package**
```
pip uninstall pytubepp -y
```
### **📌 Commands and Flags**
Using pytubePP is as simple as just supplying it only the YouTube video url as argument!
** Before Starting Please NOTE: pytubePP follows a simple rule - "Use the Default Download Configuration if No Flags are Passed"
* To download a video in maximum available resolution the command will look like:
Using PytubePP is as simple as just supplying it only the YouTube video url as argument!
> Before starting please NOTE: PytubePP follows a simple principle -> `Use Default Configuration if No Flags are Passed`
* To download a video in default configuration (maximum resolution and without any caption by default) the command will look like:
```terminal
pytubepp "https://youtube.com/watch?v=2lAe1cqCOXo"
```
> NOTE: This command will behave differently if you have changed default configurations
* To download the video in a specific resolution (suppose 480p) the command will be:
```terminal
pytubepp "https://youtube.com/watch?v=2lAe1cqCOXo" -s 480p
```
> NOTE: PytubePP always uses default configuration of flags if they are not passed for example if you only pass `-s` flag then it will use the default caption along with it, if you only pass `-c` then it will use default stream and vice versa
* To download the video with embeded caption (suppose en - English) the command will be:
```terminal
pytubepp "https://youtube.com/watch?v=2lAe1cqCOXo" -c en
```
> NOTE: You can override and disable default caption for the current video if you pass `-c none`
* To download the video in audio-only MP3 format the command will be:
```terminal
pytubepp "https://youtube.com/watch?v=2lAe1cqCOXo" -s mp3
```
* To fetch the video information the command will be:
```terminal
pytubepp "https://youtube.com/watch?v=2lAe1cqCOXo" -i
```
* To cancel/stop an ongoing download press `CTRL` + `C` on keyboard (it is recommended to run the `-ct` flag once after canceling an ongoing download).
* List of all available flags are given below:
| Flag | Usage | Requires Parameter | Requires URL | Parameters | Default |
| :--- | :--- | :--- | :--- | :--- | :--- |
| -s | Choose preferred download stream | YES | YES | `144` `144p` `240` `240p` `360` `360p` `480` `480p` `720` `720p` `hd` `1080` `1080p` `fhd` `1440` `1440p` `2k` `2160` `2160p` `4k` `mp3` (Pass any one of them) | Your chosen Default Stream via `-ds` flag |
| -i | Shows the video information like: Title, Author, Views, Available Download Streams | NO | YES | No parameters | No default |
| -ds | Set default download stream | YES | NO | `144p` `240p` `360p` `480p` `720p` `1080p` `1440p` `2160p` `mp3` `max` (Pass any one of them) | `max` |
| -df | Set custom download folder path | YES | NO | Use the full path excluding the last trailing slash within double quotes eg(in Linux): `"/path/to/folder"` (Make sure the folder path you enterted is already created and accessable) | Within `Pytube Downloads` folder in your System's `Downloads` folder |
| -s | Choose preferred download stream | YES | YES | `144` `144p` `240` `240p` `360` `360p` `480` `480p` `720` `720p` `hd` `1080` `1080p` `fhd` `1440` `1440p` `2k` `2160` `2160p` `4k` `4320` `4320p` `8k` `mp3` (Pass any one of them) | Your chosen Default Stream via `-ds` flag |
| -c | Choose preferred caption | YES | YES | All [ISO 639-1 Language Codes](https://www.w3schools.com/tags/ref_language_codes.asp) + some others (Pass any one of them) + `none` for No Caption eg: `en` for English | Your chosen Default Caption via `-dc` flag |
| -i | Shows the video information like: Title, Author, Views, Publication Date, Duration, Available Download Streams | NO | YES | No parameters | No default |
| -ls | Lists all available streams (video, audio, caption) (only for debuging purposes) | NO | YES | No parameters | No default |
| -ri | Shows the video information in raw json format | NO | YES | No parameters | No default |
| -jp | Shows raw json output in prettified view (with indentation: 4) (primarily used with -ri flag)| NO | YES | No parameters | No default |
| -ds | Set default download stream | YES | NO | `144p` `240p` `360p` `480p` `720p` `1080p` `1440p` `2160p` `4320p` `mp3` `max` (Pass any one of them) | `max` |
| -dc | Set default caption | YES | NO | All [ISO 639-1 Language Codes](https://www.w3schools.com/tags/ref_language_codes.asp) + some others + `none` for No Caption (Pass any one of them) eg: `en` for English | `none` |
| -df | Set custom download folder path | YES | NO | Use the full path excluding the last trailing slash within double quotes eg(in Linux): `"/path/to/folder"` (Make sure the folder path you enterted is already created and accessable) | Within `PytubePP Downloads` folder in your System's `Downloads` folder |
| -r | Reset to default configuration (Download Folder, Default Stream) | NO | NO | No parameters | No default |
| -sc | Show all current user configurations | NO | NO | No parameters | No default |
| -ct | Clear temporary files (audio, video, thumbnail) of the failed, incomplete downloads | NO | NO | No parameters | No default |
### 🛠️ Contributing / Building from Source
Want to be part of this? Feel free to contribute...!! Pull Requests are always welcome...!! (^_^) Follow these simple steps to start building:
* Make sure to install Python, FFmpeg, Node.js and Git before proceeding.
1. Fork this repo in your github account.
2. Git clone the forked repo in your local machine.
> Use `python3` and `pip3` commands instead of `python` and `pip` if you are on Linux or MacOS.
3. Install python dependencies
```terminal
pip install -r requirements.txt
```
4. build, install and test the module
```terminal
python -m build // build the module
pip install .\dist\pytubepp-<version>-py3-none-any.whl // install the module (give the path to the newly genrated whl file based on your OS path style and don't forget to replace the <version> with the actual version number)
```
5. Do the changes, Send a Pull Request with proper Description (NOTE: Pull Requests Without Proper Description will be Rejected)
⭕ Noticed any Bugs? or Want to give me some suggetions? always feel free to open an issue...!!
### 📝 License & Usage
pytubePP - (Pytube Post Processor) is a Fully Open Sourced Project licensed under MIT License. Anyone can view, modify, use (personal and commercial) or distribute it's sources without any attribution and extra permissions.
PytubePP - (Pytube Post Processor) is a Fully Open Sourced Project licensed under MIT License. Anyone can view, modify, use (personal and commercial) or distribute it's sources without any attribution and extra permissions.
**🌟 Liked this project? Please consider giving it a star to show me your appreciation**
<br></br>
⚖️ NOTE: YouTube is a trademark of Google LLC. Use of this trademark is subject to Google Permissions. Downloading and using Copyrighted YouTube Content for Commercial pourposes are not allowed by YouTube Terms without proper Permissions from the Creator. We don't promote this kinds of activity, You should use the downloaded contents wisely and at your own responsibility.
****

60
pyproject.toml Normal file
View File

@@ -0,0 +1,60 @@
[build-system]
requires = ["setuptools>=67.4.0"]
build-backend = "setuptools.build_meta"
[project]
name = "pytubepp"
version = "1.1.5"
authors = [
{ name="Subhamoy Biswas", email="hey@neosubhamoy.com" },
]
description = "A Simple CLI Tool to Download Your Favorite YouTube Videos Effortlessly!"
readme = "README.md"
requires-python = ">=3.8"
license = {text = "MIT"}
keywords = ["youtube", "download", "video", "pytube", "cli"]
classifiers = [
"Development Status :: 5 - Production/Stable",
"Environment :: Console",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Natural Language :: English",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python",
"Topic :: Internet",
"Topic :: Multimedia :: Video",
"Topic :: Multimedia :: Sound/Audio :: Conversion",
"Topic :: Multimedia :: Video :: Conversion",
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: Terminals",
"Topic :: Utilities",
]
dependencies = [
"pytubefix",
"requests",
"ffmpy",
"mutagen",
"tabulate",
"tqdm",
"appdirs",
"setuptools",
]
[project.scripts]
pytubepp = "pytubepp.main:main"
[project.urls]
"Homepage" = "https://github.com/neosubhamoy/pytubepp"
"Bug Reports" = "https://github.com/neosubhamoy/pytubepp/issues"
[tool.setuptools.packages.find]
include = ["pytubepp*"]
[tool.setuptools]
license-files = ["LICENSE"]

54
pytubepp/config.py Normal file
View File

@@ -0,0 +1,54 @@
import os, json, platform, appdirs, tempfile
def get_download_folder():
system = platform.system()
if system in ["Windows", "Darwin", "Linux"]:
cli_download_dir = os.path.join(os.path.expanduser("~"), "Downloads", "PytubePP Downloads")
os.makedirs(cli_download_dir, exist_ok=True)
return cli_download_dir
else:
cli_download_dir = os.path.join(appdirs.user_download_dir(), "PytubePP Downloads")
os.makedirs(cli_download_dir, exist_ok=True)
return cli_download_dir
DEFAULT_CONFIG = {
'downloadDIR': get_download_folder(),
'defaultStream': 'max',
'defaultCaption': 'none',
}
def get_temporary_directory():
temp_dir = tempfile.gettempdir()
cli_temp_dir = os.path.join(temp_dir, 'pytubepp')
os.makedirs(cli_temp_dir, exist_ok=True)
return cli_temp_dir
def load_config():
config_dir = appdirs.user_config_dir('pytubepp')
config_path = os.path.join(config_dir, 'config.json')
if os.path.exists(config_path):
with open(config_path, 'r') as f:
return json.load(f)
else:
return DEFAULT_CONFIG
def save_config(config):
config_dir = appdirs.user_config_dir('pytubepp')
os.makedirs(config_dir, exist_ok=True)
config_path = os.path.join(config_dir, 'config.json')
with open(config_path, 'w') as f:
json.dump(config, f, indent=4)
def update_config(key, value):
config = load_config()
config[key] = value
save_config(config)
def reset_config():
config_dir = appdirs.user_config_dir('pytubepp')
config_path = os.path.join(config_dir, 'config.json')
if os.path.exists(config_path):
os.remove(config_path)
print('\nConfig reset successful!')
else:
print('\nAlready using the default configs! Not resetting...!')

88
pytubepp/download.py Normal file
View File

@@ -0,0 +1,88 @@
from tqdm import tqdm
from .config import get_temporary_directory, load_config
from .utils import get_unique_filename, postprocess_cleanup
import os, re, requests, shutil, sys, random, ffmpy
userConfig = load_config()
downloadDIR = userConfig['downloadDIR']
tempDIR = get_temporary_directory()
def download_progressive(stream, itag, title, resolution, file_extention, captions, caption_code=None, tempDIR=tempDIR, downloadDIR=downloadDIR):
global total_filesize, progress_bar
selected_vdo = stream.get_by_itag(itag)
total_filesize = selected_vdo.filesize
progress_bar = tqdm(total=total_filesize, unit='B', unit_scale=True, desc="Downloading Video+Audio")
random_filename = str(random.randint(1000000000, 9999999999))
filename = random_filename + '_vdo.' + file_extention
output_temp_file = os.path.join(tempDIR, filename)
output_file = os.path.join(downloadDIR, get_unique_filename(title + '_' + resolution + '.' + file_extention)) if not caption_code else os.path.join(downloadDIR, get_unique_filename(title + '_' + resolution + '_' + caption_code + '.' + file_extention))
selected_vdo.download(output_path=tempDIR, filename=filename)
if caption_code:
print(f'Downloading Caption ({caption_code})...')
caption = captions[caption_code]
caption_file = os.path.join(tempDIR, random_filename + '_cap.srt')
caption.save_captions(caption_file)
print('Processing...')
devnull = open(os.devnull, 'w')
output_temp_file_with_subs = os.path.join(tempDIR, random_filename + '_merged.' + file_extention)
ff = ffmpy.FFmpeg(
inputs={output_temp_file: None},
outputs={output_temp_file_with_subs: ['-i', caption_file, '-c', 'copy', '-c:s', 'mov_text', '-metadata:s:s:0', f'language={caption_code}', '-metadata:s:s:0', f'title={caption_code}', '-metadata:s:s:0', f'handler_name={caption_code}']}
)
ff.run(stdout=devnull, stderr=devnull)
devnull.close()
shutil.move(output_temp_file_with_subs, output_file)
postprocess_cleanup(tempDIR, ['_vdo.' + file_extention, '_cap.srt', '_merged.' + file_extention], random_filename)
print('Done! 🎉')
else:
print('Processing...')
shutil.move(output_temp_file, output_file)
print('Done! 🎉')
def download_nonprogressive(stream, itag_vdo, itag_ado, file_extention, output_path):
global total_filesize, progress_bar
selected_vdo = stream.get_by_itag(itag_vdo)
selected_ado = stream.get_by_itag(itag_ado)
random_filename = str(random.randint(1000000000, 9999999999))
total_filesize = selected_vdo.filesize
progress_bar = tqdm(total=total_filesize, unit='B', unit_scale=True, desc="Downloading Video")
selected_vdo.download(output_path=output_path, filename=random_filename + '_vdo.' + file_extention)
total_filesize = selected_ado.filesize
progress_bar = tqdm(total=total_filesize, unit='B', unit_scale=True, desc="Downloading Audio")
selected_ado.download(output_path=output_path, filename=random_filename + '_ado.' + file_extention)
return random_filename
def download_audio(stream, itag, output_path):
global total_filesize, progress_bar
selected_ado = stream.get_by_itag(itag)
total_filesize = selected_ado.filesize
progress_bar = tqdm(total=total_filesize, unit='B', unit_scale=True, desc="Downloading Audio")
random_filename = str(random.randint(1000000000, 9999999999))
selected_ado.download(output_path=output_path, filename=random_filename + '_ado.mp4')
return random_filename
def download_thumbnail(url, file_path):
print('Downloading thumbnail...')
maxres_url = re.sub(r'/[^/]*\.jpg.*$', '/maxresdefault.jpg', url)
hq_url = re.sub(r'/[^/]*\.jpg.*$', '/hqdefault.jpg', url)
response = requests.get(maxres_url, stream=True)
if response.status_code != 200:
response = requests.get(hq_url, stream=True)
if response.status_code == 200:
with open(file_path, 'wb') as file:
response.raw.decode_content = True
shutil.copyfileobj(response.raw, file)
else:
print('Failed to download thumbnail...!')
sys.exit()
def progress(chunk, file_handle, bytes_remaining):
chunk_size = total_filesize - bytes_remaining
progress_bar.update(chunk_size - progress_bar.n)
if bytes_remaining == 0:
progress_bar.close()

View File

@@ -1,395 +1,364 @@
from pytube import YouTube
from mutagen.id3 import ID3, APIC, TIT2, TPE1, TALB
from pytubefix import YouTube
from tabulate import tabulate
from tqdm import tqdm
import appdirs, ffmpy, requests, re, os, sys, random, shutil, platform, json, argparse, tempfile, pkg_resources
from .config import get_temporary_directory, load_config, update_config, reset_config
from .download import download_progressive, download_nonprogressive, download_audio, progress
from .postprocess import merge_audio_video, convert_to_mp3
from .utils import get_version, clear_temp_files, is_valid_url, network_available, ffmpeg_installed, nodejs_installed
import appdirs, os, re, sys, argparse, json
def get_version():
try:
return pkg_resources.get_distribution("pytubepp").version
except pkg_resources.DistributionNotFound:
return "Unknown"
class YouTubeDownloader:
def __init__(self):
self.user_config = load_config()
self.download_dir = self.user_config['downloadDIR']
self.temp_dir = get_temporary_directory()
self.config_dir = appdirs.user_config_dir('pytubepp')
self.default_stream = self.user_config['defaultStream']
self.default_caption = self.user_config['defaultCaption']
self.version = get_version()
def get_download_folder():
system = platform.system()
if system in ["Windows", "Darwin", "Linux"]:
cli_download_dir = os.path.join(os.path.expanduser("~"), "Downloads", "Pytube Downloads")
os.makedirs(cli_download_dir, exist_ok=True)
return cli_download_dir
else:
cli_download_dir = os.path.join(appdirs.user_download_dir(), "Pytube Downloads")
os.makedirs(cli_download_dir, exist_ok=True)
return cli_download_dir
# Video attributes
self.video = None
self.author = None
self.title = None
self.thumbnail = None
self.views = None
self.stream = None
self.maxres = None
def get_temporary_directory():
temp_dir = tempfile.gettempdir()
cli_temp_dir = os.path.join(temp_dir, 'pytubepp')
os.makedirs(cli_temp_dir, exist_ok=True)
return cli_temp_dir
self.stream_resolutions = {
'4320p': {'allowed_streams': ['8k', '4320', '4320p'], 'message': ['4320p', '[8k, 4320, 4320p]']},
'2160p': {'allowed_streams': ['4k', '2160', '2160p'], 'message': ['2160p', '[4k, 2160, 2160p]']},
'1440p': {'allowed_streams': ['2k', '1440', '1440p'], 'message': ['1440p', '[2k, 1440, 1440p]']},
'1080p': {'allowed_streams': ['fhd', '1080', '1080p'], 'message': ['1080p', '[fhd, 1080, 1080p]']},
'720p': {'allowed_streams': ['hd', '720', '720p'], 'message': ['720p', '[hd, 720, 720p]']},
'480p': {'allowed_streams': ['480', '480p'], 'message': ['480p', '[480, 480p]']},
'360p': {'allowed_streams': ['360', '360p'], 'message': ['360p', '[360, 360p]']},
'240p': {'allowed_streams': ['240', '240p'], 'message': ['240p', '[240, 240p]']},
'144p': {'allowed_streams': ['144', '144p'], 'message': ['144p', '[144, 144p]']},
'mp3': {'allowed_streams': ['mp3'], 'message': ['mp3', '[mp3]']}
}
DEFAULT_CONFIG = {
'downloadDIR': get_download_folder(),
'defaultStream': 'max',
}
def load_config():
config_dir = appdirs.user_config_dir('pytubepp')
config_path = os.path.join(config_dir, 'config.json')
if os.path.exists(config_path):
with open(config_path, 'r') as f:
return json.load(f)
else:
return DEFAULT_CONFIG
def save_config(config):
config_dir = appdirs.user_config_dir('pytubepp')
os.makedirs(config_dir, exist_ok=True)
config_path = os.path.join(config_dir, 'config.json')
with open(config_path, 'w') as f:
json.dump(config, f, indent=4)
def update_config(key, value):
config = load_config()
config[key] = value
save_config(config)
def reset_config():
config_dir = appdirs.user_config_dir('pytubepp')
config_path = os.path.join(config_dir, 'config.json')
if os.path.exists(config_path):
os.remove(config_path)
print('\nConfig reset successful!')
else:
print('\nAlready using the default configs! Not resetting...!')
userConfig = load_config()
downloadDIR = userConfig['downloadDIR']
tempDIR = get_temporary_directory()
configDIR = appdirs.user_config_dir('pytubepp')
defaultStream = userConfig['defaultStream']
version = get_version()
def merge_audio_video(title, resolution, file_extention, random_filename, tempDIR=tempDIR, downloadDIR=downloadDIR):
video_file = os.path.join(tempDIR, random_filename + '_vdo.' + file_extention)
audio_file = os.path.join(tempDIR, random_filename + '_ado.' + file_extention)
output_temp_file = os.path.join(tempDIR, random_filename + '_merged.' + file_extention)
output_file = os.path.join(downloadDIR, get_unique_filename(title + '_' + resolution + '.' + file_extention))
input_params = {video_file: None, audio_file: None}
output_params = {output_temp_file: ['-c:v', 'copy', '-c:a', 'copy']}
print('Processing...')
devnull = open(os.devnull, 'w')
ff = ffmpy.FFmpeg(inputs=input_params, outputs=output_params)
ff.run(stdout=devnull, stderr=devnull)
devnull.close()
os.rename(output_temp_file, output_file)
postprocess_cleanup(tempDIR, ['_vdo.' + file_extention, '_ado.' + file_extention, '_merged.' + file_extention], random_filename)
print('Done! 🎉')
def convert_to_mp3(title, thumbnail_url, random_filename, mp3_artist='Unknown', mp3_title='Unknown', mp3_album='Unknown', tempDIR=tempDIR, downloadDIR=downloadDIR):
image_file = os.path.join(tempDIR, random_filename + '_thumbnail.jpg')
download_thumbnail(thumbnail_url, image_file)
audio_file = os.path.join(tempDIR, random_filename + '_ado.mp4')
output_file = os.path.join(downloadDIR, get_unique_filename(title + '_audio.mp3'))
print('Processing...')
devnull = open(os.devnull, 'w')
video_file = os.path.join(tempDIR, random_filename + '_thumbnail.mp4')
ff1 = ffmpy.FFmpeg(
inputs={image_file: '-loop 1 -t 1'},
outputs={video_file: '-vf "scale=1280:720" -r 1 -c:v libx264 -t 1'}
)
ff1.run(stdout=devnull, stderr=devnull)
merged_file = os.path.join(tempDIR, random_filename + '_merged.mp4')
ff2 = ffmpy.FFmpeg(
inputs={video_file: None, audio_file: None},
outputs={merged_file: '-c:v copy -c:a copy'}
)
ff2.run(stdout=devnull, stderr=devnull)
output_temp_file = os.path.join(tempDIR, random_filename + '_merged.mp3')
ff3 = ffmpy.FFmpeg(
inputs={merged_file: None},
outputs={output_temp_file: '-vn -c:a libmp3lame -q:a 2'}
)
ff3.run(stdout=devnull, stderr=devnull)
devnull.close()
audio = ID3(output_temp_file)
audio.add(TIT2(encoding=3, text=mp3_title))
audio.add(TPE1(encoding=3, text=mp3_artist))
audio.add(TALB(encoding=3, text=mp3_album))
with open(image_file, 'rb') as img:
audio.add(APIC(
encoding=3,
mime='image/jpeg',
type=3,
desc=u'Cover',
data=img.read()
))
audio.save()
os.rename(output_temp_file, output_file)
postprocess_cleanup(tempDIR, ['_thumbnail.jpg', '_thumbnail.mp4', '_ado.mp4', '_merged.mp4'], random_filename)
print('Done! 🎉')
def download_progressive(stream, itag, title, resolution, file_extention, tempDIR=tempDIR, downloadDIR=downloadDIR):
global vdo_filesize, progress_bar
selected_vdo = stream.get_by_itag(itag)
vdo_filesize = selected_vdo.filesize
progress_bar = tqdm(total=vdo_filesize, unit='B', unit_scale=True, desc="Downloading")
random_filename = str(random.randint(1000000000, 9999999999))
filename = random_filename + '_vdo.' + file_extention
output_temp_file = os.path.join(tempDIR, filename)
output_file = os.path.join(downloadDIR, get_unique_filename(title + '_' + resolution + '.' + file_extention))
selected_vdo.download(output_path=tempDIR, filename=filename)
print('Processing...')
os.rename(output_temp_file, output_file)
print('Done! 🎉')
def download_nonprogressive(stream, itag_vdo, itag_ado, file_extention, output_path):
global vdo_filesize, progress_bar
selected_vdo = stream.get_by_itag(itag_vdo)
selected_ado = stream.get_by_itag(itag_ado)
vdo_filesize = selected_vdo.filesize
progress_bar = tqdm(total=vdo_filesize, unit='B', unit_scale=True, desc="Downloading")
random_filename = str(random.randint(1000000000, 9999999999))
selected_vdo.download(output_path=output_path, filename=random_filename + '_vdo.' + file_extention)
selected_ado.download(output_path=output_path, filename=random_filename + '_ado.' + file_extention)
return random_filename
def download_audio(stream, itag, output_path):
global vdo_filesize, progress_bar
selected_ado = stream.get_by_itag(itag)
vdo_filesize = selected_ado.filesize
progress_bar = tqdm(total=vdo_filesize, unit='B', unit_scale=True, desc="Downloading")
random_filename = str(random.randint(1000000000, 9999999999))
selected_ado.download(output_path=output_path, filename=random_filename + '_ado.mp4')
return random_filename
def download_thumbnail(url, file_path):
print('Downloading thumbnail...')
maxres_url = re.sub(r'/[^/]*\.jpg.*$', '/maxresdefault.jpg', url)
hq_url = re.sub(r'/[^/]*\.jpg.*$', '/hqdefault.jpg', url)
response = requests.get(maxres_url, stream=True)
if response.status_code != 200:
response = requests.get(hq_url, stream=True)
if response.status_code == 200:
with open(file_path, 'wb') as file:
response.raw.decode_content = True
shutil.copyfileobj(response.raw, file)
else:
print('Failed to download thumbnail...!')
def set_video_info(self, link):
if not network_available():
print('\nRequest timeout! Please check your network and try again...!!')
sys.exit()
def progress(chunk, file_handle, bytes_remaining):
chunk_size = vdo_filesize - bytes_remaining
progress_bar.update(chunk_size - progress_bar.n)
if not nodejs_installed():
print("\nWarning: Node.js is not installed or not found in PATH!")
print("BotGuard poToken generation will not work properly without Node.js environment")
print("Please install Node.js from https://nodejs.org/en/download\n")
if bytes_remaining == 0:
progress_bar.close()
def postprocess_cleanup(dir, files, random_filename):
for file in files:
file_path = os.path.join(dir, random_filename + file)
try:
if os.path.isfile(file_path):
os.remove(file_path)
except Exception as e:
print(e)
def clear_temp_files():
if os.listdir(tempDIR) != []:
for file in os.listdir(tempDIR):
file_path = os.path.join(tempDIR, file)
try:
if os.path.isfile(file_path):
os.remove(file_path)
print(f'Removed: {file}')
except Exception as e:
print(e)
else:
print('No temporary files found to clear...!')
def get_unique_filename(filename, directory=downloadDIR):
base_name, extension = os.path.splitext(filename)
counter = 1
while os.path.exists(os.path.join(directory, filename)):
filename = f"{base_name} ({counter}){extension}"
counter += 1
return filename
def is_valid_url(url):
match = re.search(r"(https?://(?:www\.|music\.)?youtube\.com/watch\?v=[^&]{11}|https?://youtu\.be/[^?&]*(\?si=[^&]*)?)", url)
return match
def set_global_video_info(link):
if is_valid_url(link):
global video, author, title, thumbnail, views, stream, stream_resolutions, maxres
link = is_valid_url(link).group(1)
video = YouTube(link, on_progress_callback=progress)
author = video.author
title = re.sub(r'[\\/*?:"<>|]', '_', author + '-' + video.title)
thumbnail = video.thumbnail_url
views = str(video.views)
stream = video.streams
stream_resolutions = {
'2160p': {
'allowed_streams': ['4k', '2160', '2160p'],
'message': ['2160p', 'webm', 'vp90', 'opus', '[4k, 2160, 2160p]']
},
'1440p': {
'allowed_streams': ['2k', '1440', '1440p'],
'message': ['1440p', 'webm', 'vp90', 'opus', '[2k, 1440, 1440p]']
},
'1080p': {
'allowed_streams': ['fhd', '1080', '1080p'],
'message': ['1080p', 'mp4', 'avc1', 'aac', '[fhd, 1080, 1080p]']
},
'720p': {
'allowed_streams': ['hd', '720', '720p'],
'message': ['720p', 'mp4', 'avc1', 'aac', '[hd, 720, 720p]']
},
'480p': {
'allowed_streams': ['480', '480p'],
'message': ['480p', 'mp4', 'avc1', 'aac', '[480, 480p]']
},
'360p': {
'allowed_streams': ['360', '360p'],
'message': ['360p', 'mp4', 'avc1', 'aac', '[360, 360p]']
},
'240p': {
'allowed_streams': ['240', '240p'],
'message': ['240p', 'mp4', 'avc1', 'aac', '[240, 240p]']
},
'144p': {
'allowed_streams': ['144', '144p'],
'message': ['144p', 'mp4', 'avc1', 'aac', '[144, 144p]']
},
'mp3': {
'allowed_streams': ['mp3'],
'message': ['mp3', 'mp3', 'none', 'mp3', '[mp3]']
}
}
for res in stream_resolutions.keys():
if res != 'mp3' and stream.filter(res=res):
maxres = res
self.video = YouTube(link, 'WEB', on_progress_callback=progress)
self.author = self.video.author
self.title = re.sub(r'[\\/*?:"<>|]', '_', self.author + ' - ' + self.video.title)
self.thumbnail = self.video.thumbnail_url
self.views = str(self.video.views)
self.stream = self.video.streams
self.captions = self.video.captions
# Find maximum resolution
for res in self.stream_resolutions.keys():
if res != 'mp3' and self.stream.filter(res=res):
self.maxres = res
break
return True
else:
return False
def show_video_info(link):
if set_global_video_info(link):
def get_stream_info(self, res, matching_stream):
"""Helper method to get stream information based on resolution"""
stream_info = {}
if res == 'mp3':
stream_info = {
'type': "audio/mp3",
'filesize': f"{matching_stream.filesize / (1024 * 1024 * 1024):.2f} GB" if matching_stream.filesize >= 1073741824 else f"{matching_stream.filesize / (1024 * 1024):.2f} MB",
'raw_filesize': matching_stream.filesize,
'fps': None,
'raw_fps': None,
'vdo_codec': None,
'ado_codec': matching_stream.audio_codec,
'vdo_bitrate': None,
'ado_bitrate': matching_stream.abr
}
elif res == '360p':
stream_info = {
'type': matching_stream.mime_type,
'filesize': f"{matching_stream.filesize / (1024 * 1024 * 1024):.2f} GB" if matching_stream.filesize >= 1073741824 else f"{matching_stream.filesize / (1024 * 1024):.2f} MB",
'raw_filesize': matching_stream.filesize,
'fps': f"{matching_stream.fps}fps",
'raw_fps': matching_stream.fps,
'vdo_codec': matching_stream.video_codec,
'ado_codec': matching_stream.audio_codec,
'vdo_bitrate': f"{matching_stream.bitrate / 1024:.0f}kbps",
'ado_bitrate': matching_stream.abr
}
else:
_select_suitable_audio_stream = lambda stream: 251 if stream.mime_type == 'video/webm' else 140
# Check for HDR variants first
hdr_stream = None
if res in ['4320p', '2160p', '1440p', '1080p', '720p']:
hdr_itags = {'4320p': 702, '2160p': 701, '1440p': 700, '1080p': 699, '720p': 698}
hdr_stream = self.stream.get_by_itag(hdr_itags.get(res))
# Use HDR stream if available, otherwise use the original stream
final_stream = hdr_stream if hdr_stream else matching_stream
audio_stream = self.stream.get_by_itag(_select_suitable_audio_stream(final_stream))
total_size = final_stream.filesize + audio_stream.filesize
stream_info = {
'type': final_stream.mime_type,
'filesize': f"{total_size / (1024 * 1024 * 1024):.2f} GB" if total_size >= 1073741824 else f"{total_size / (1024 * 1024):.2f} MB",
'raw_filesize': total_size,
'fps': f"{final_stream.fps}fps",
'raw_fps': final_stream.fps,
'vdo_codec': final_stream.video_codec,
'ado_codec': audio_stream.audio_codec,
'vdo_bitrate': f"{final_stream.bitrate / 1024:.0f}kbps",
'ado_bitrate': audio_stream.abr,
'is_hdr': bool(hdr_stream), # Track if this is an HDR stream
'stream_itag': final_stream.itag # Track the actual itag being used
}
return stream_info
def show_video_info(self, link):
if self.set_video_info(link):
table = []
found = False
for res in stream_resolutions.keys():
if found or (res not in ['mp3'] and stream.filter(res=res)) or (res == 'mp3' and stream.get_by_itag(140)):
for res in self.stream_resolutions.keys():
if found or (res not in ['mp3'] and self.stream.filter(res=res)) or (res == 'mp3' and self.stream.get_by_itag(140)):
found = True
if res == 'mp3':
matching_stream = stream.get_by_itag(140)
else:
matching_stream = next((s for s in stream if s.resolution == res), None)
if matching_stream is not None:
filesize = f"{matching_stream.filesize / (1024 * 1024):.2f} MB"
else:
filesize = "N/A"
message = stream_resolutions[res]['message'] + [filesize]
matching_stream = self.stream.get_by_itag(140) if res == 'mp3' else next((s for s in self.stream if s.resolution == res), None)
if matching_stream:
stream_info = self.get_stream_info(res, matching_stream)
message = self.stream_resolutions[res]['message'] + [
stream_info['type'],
stream_info['filesize'],
stream_info['fps'] if stream_info['fps'] else "none",
stream_info['vdo_codec'] if stream_info['vdo_codec'] else "none",
stream_info['ado_codec'],
stream_info['vdo_bitrate'] if stream_info['vdo_bitrate'] else "none",
stream_info['ado_bitrate']
]
table.append(message)
if not found:
print('Sorry, No video streams found....!!!')
sys.exit()
print(f'\nTitle: {video.title}\nAuthor: {author}\nViews: {views}\n')
print(tabulate(table, headers=['Streams', 'Format', 'Video Codec', 'Audio Codec', 'Aliases', 'Size']))
print(f'\nTitle: {self.video.title}\nAuthor: {self.author}\nPublished On: {self.video.publish_date.strftime("%d/%m/%Y")}\nDuration: {f"{self.video.length//3600:02}:{(self.video.length%3600)//60:02}:{self.video.length%60:02}" if self.video.length >= 3600 else f"{(self.video.length%3600)//60:02}:{self.video.length%60:02}"}\nViews: {self.views}\nCaptions: {[caption.code for caption in self.captions.keys()] or "Unavailable"}\n')
print(tabulate(table, headers=['Stream', 'Alias (for -s flag)', 'Format', 'Size', 'FrameRate', 'V-Codec', 'A-Codec', 'V-BitRate', 'A-BitRate']))
print('\n')
else:
print('\nInvalid video link! Please enter a valid video url...!!')
def get_allowed_streams(link):
if set_global_video_info(link):
def show_all_streams(self, link):
if self.set_video_info(link):
print(f"Available Streams({len(self.stream)}):")
if self.stream:
for stream in self.stream:
print(stream)
else:
print('No stream available!')
print(f"\nAvailable Captions({len(self.captions)}):")
if self.captions:
for caption in self.captions:
print(caption)
else:
print('No caption available!')
else:
print('\nInvalid video link! Please enter a valid video url...!!')
def show_raw_info(self, link, prettify=False):
if self.set_video_info(link):
streams_list = []
found = False
for res in self.stream_resolutions.keys():
if found or (res not in ['mp3'] and self.stream.filter(res=res)) or (res == 'mp3' and self.stream.get_by_itag(140)):
found = True
matching_stream = self.stream.get_by_itag(140) if res == 'mp3' else next((s for s in self.stream if s.resolution == res), None)
if matching_stream:
stream_info = self.get_stream_info(res, matching_stream)
streams_list.append({
'itag': stream_info.get('stream_itag', matching_stream.itag),
'res': res,
'mime_type': stream_info['type'],
'file_size': stream_info['raw_filesize'],
'fps': stream_info['raw_fps'],
'vcodec': stream_info['vdo_codec'],
'acodec': stream_info['ado_codec'],
'vbitrate': stream_info['vdo_bitrate'],
'abitrate': stream_info['ado_bitrate'],
'is_hdr': stream_info.get('is_hdr', False)
})
if not found:
print('Sorry, No video streams found....!!!')
sys.exit()
output = {
'id': self.video.video_id,
'title': self.video.title,
'author': self.author,
'thumbnail_url': self.thumbnail,
'views': self.video.views,
'published_on': self.video.publish_date.strftime('%d/%m/%Y'),
'duration': self.video.length,
'streams': streams_list,
'captions': [caption.code for caption in self.captions.keys()] or None
}
print(json.dumps(output, indent=4 if prettify else None))
else:
print('\nInvalid video link! Please enter a valid video url...!!')
def get_allowed_streams(self, link):
if self.set_video_info(link):
allowed_streams = []
found = False
for res in stream_resolutions.keys():
if found or (res not in ['mp3'] and stream.filter(res=res)) or (res == 'mp3' and stream.get_by_itag(140)):
for res in self.stream_resolutions.keys():
if found or (res not in ['mp3'] and self.stream.filter(res=res)) or (res == 'mp3' and self.stream.get_by_itag(140)):
found = True
allowed_streams.extend(stream_resolutions[res]['allowed_streams'])
allowed_streams.extend(self.stream_resolutions[res]['allowed_streams'])
return allowed_streams
else:
print('\nInvalid video link! Please enter a valid video url...!!')
return []
def print_short_info(chosen_stream):
if chosen_stream in ['720', '720p', 'hd']:
print(f'\nVideo: {title}\nSelected Stream: 720p (HD) [mp4 - avc1 - aac]\n')
elif chosen_stream in ['360', '360p']:
print(f'\nVideo: {title}\nSelected Stream: 360p (SD) [mp4 - avc1 - aac]\n')
elif chosen_stream in ['1080', '1080p', 'fhd']:
print(f'\nVideo: {title}\nSelected Stream: 1080p (FHD) [mp4 - avc1 - aac]\n')
elif chosen_stream in ['480', '480p']:
print(f'\nVideo: {title}\nSelected Stream: 480p (SD) [mp4 - avc1 - aac]\n')
elif chosen_stream in ['240', '240p']:
print(f'\nVideo: {title}\nSelected Stream: 240p (LD) [mp4 - avc1 - aac]\n')
elif chosen_stream in ['144', '144p']:
print(f'\nVideo: {title}\nSelected Stream: 144p (LD) [mp4 - avc1 - aac]\n')
elif chosen_stream in ['2160', '2160p', '4k']:
print(f'\nVideo: {title}\nSelected Stream: 2160p (4K) [webm - vp90 - opus]\n')
elif chosen_stream in ['1440', '1440p', '2k']:
print(f'\nVideo: {title}\nSelected Stream: 1440p (2K) [webm - vp90 - opus]\n')
elif chosen_stream == 'mp3':
print(f'\nVideo: {title}\nSelected Stream: mp3 (Audio) [mp3 - dynamic - 44.1khz]\n')
def get_allowed_captions(self, link):
if self.set_video_info(link):
return self.captions.keys()
else:
print('\nInvalid video link! Please enter a valid video url...!!')
return []
def print_short_info(self, chosen_stream, chosen_caption=None):
print(f'\nTitle: {self.title}')
if chosen_stream == 'mp3':
print(f'Selected: Audio [128kbps (140)]')
return
if chosen_stream in ['360', '360p']:
print(f"Selected: Video [360p (18)] + Audio [96kbps (18)]{f' + Caption [{chosen_caption}]' if chosen_caption else ''}")
return
_select_suitable_audio_stream = lambda stream: 251 if stream.mime_type == 'video/webm' else 140
res = next((k for k, v in self.stream_resolutions.items() if chosen_stream in v['allowed_streams']), None)
if res:
hdr_stream = None
if res in ['4320p', '2160p', '1440p', '1080p', '720p']:
hdr_itags = {'4320p': 702, '2160p': 701, '1440p': 700, '1080p': 699, '720p': 698}
hdr_stream = self.stream.get_by_itag(hdr_itags.get(res))
matching_stream = hdr_stream if hdr_stream else self.stream.filter(res=res).first()
audio_stream = self.stream.get_by_itag(_select_suitable_audio_stream(matching_stream))
print(f"Selected: Video [{res} ({matching_stream.itag})] + Audio [{audio_stream.abr} ({audio_stream.itag})]{f' + Caption [{chosen_caption}]' if chosen_caption else ''}")
def download_stream(self, link, chosen_stream, chosen_caption=None):
if not ffmpeg_installed():
print("\nWarning: FFmpeg is not installed or not found in PATH!")
print("Some core functionalities like video processing will not work properly without FFmpeg")
print("Please install FFmpeg, read https://github.com/neosubhamoy/pytubepp#%EF%B8%8F-installation for instructions\n")
sys.exit()
if self.set_video_info(link):
allowed_streams = self.get_allowed_streams(link)
allowed_captions = self.get_allowed_captions(link)
if chosen_caption and (chosen_caption not in allowed_captions):
print('\nInvalid caption code or caption not available! Please choose a different caption...!! (use -i to see available captions)')
sys.exit()
def download_stream(link, chosen_stream):
if set_global_video_info(link):
print_short_info(chosen_stream)
allowed_streams = get_allowed_streams(link)
if chosen_stream in allowed_streams:
if chosen_stream in ['720', '720p', 'hd']:
download_progressive(stream, 22, title, '720p', 'mp4')
elif chosen_stream in ['360', '360p']:
download_progressive(stream, 18, title, '360p', 'mp4')
self.print_short_info(chosen_stream, chosen_caption)
if chosen_stream in ['360', '360p']:
download_progressive(self.stream, 18, self.title, '360p', 'mp4', self.captions, chosen_caption)
elif chosen_stream in ['1080', '1080p', 'fhd']:
merge_audio_video(title, '1080p', 'mp4', download_nonprogressive(stream, 137, 140, 'mp4', tempDIR))
self._handle_1080p_download(chosen_caption)
elif chosen_stream in ['720', '720p', 'hd']:
self._handle_720p_download(chosen_caption)
elif chosen_stream in ['480', '480p']:
merge_audio_video(title, '480p', 'mp4', download_nonprogressive(stream, 135, 140, 'mp4', tempDIR))
merge_audio_video(self.title, '480p', 'mp4', download_nonprogressive(self.stream, 135, 140, 'mp4', self.temp_dir), self.captions, chosen_caption)
elif chosen_stream in ['240', '240p']:
merge_audio_video(title, '240p', 'mp4', download_nonprogressive(stream, 133, 140, 'mp4', tempDIR))
merge_audio_video(self.title, '240p', 'mp4', download_nonprogressive(self.stream, 133, 140, 'mp4', self.temp_dir), self.captions, chosen_caption)
elif chosen_stream in ['144', '144p']:
merge_audio_video(title, '144p', 'mp4', download_nonprogressive(stream, 160, 140, 'mp4', tempDIR))
merge_audio_video(self.title, '144p', 'mp4', download_nonprogressive(self.stream, 160, 140, 'mp4', self.temp_dir), self.captions, chosen_caption)
elif chosen_stream in ['4320', '4320p', '8k']:
self._handle_4320p_download(chosen_caption)
elif chosen_stream in ['2160', '2160p', '4k']:
merge_audio_video(title, '4k', 'webm', download_nonprogressive(stream, 313, 251, 'webm', tempDIR))
self._handle_2160p_download(chosen_caption)
elif chosen_stream in ['1440', '1440p', '2k']:
merge_audio_video(title, '2k', 'webm', download_nonprogressive(stream, 271, 251, 'webm', tempDIR))
self._handle_1440p_download(chosen_caption)
elif chosen_stream == 'mp3':
convert_to_mp3(title, thumbnail, download_audio(stream, 140, tempDIR), author, video.title, author)
convert_to_mp3(self.title, self.thumbnail, download_audio(self.stream, 140, self.temp_dir), self.author, self.video.title, self.author)
else:
print('\nInvalid download stream or stream not available! Please choose a different stream...!! (use -i to see available streams)')
else:
print('\nInvalid video link! Please enter a valid video url...!!')
def _handle_4320p_download(self, chosen_caption=None):
if self.stream.get_by_itag(702):
merge_audio_video(self.title, '8k', 'mp4', download_nonprogressive(self.stream, 702, 140, 'mp4', self.temp_dir), self.captions, chosen_caption)
elif self.stream.get_by_itag(571):
merge_audio_video(self.title, '8k', 'mp4', download_nonprogressive(self.stream, 571, 140, 'mp4', self.temp_dir), self.captions, chosen_caption)
def _handle_2160p_download(self, chosen_caption=None):
if self.stream.get_by_itag(701):
merge_audio_video(self.title, '4k', 'mp4', download_nonprogressive(self.stream, 701, 140, 'mp4', self.temp_dir), self.captions, chosen_caption)
elif self.stream.get_by_itag(315):
merge_audio_video(self.title, '4k', 'webm', download_nonprogressive(self.stream, 315, 251, 'webm', self.temp_dir), self.captions, chosen_caption)
elif self.stream.get_by_itag(313):
merge_audio_video(self.title, '4k', 'webm', download_nonprogressive(self.stream, 313, 251, 'webm', self.temp_dir), self.captions, chosen_caption)
def _handle_1440p_download(self, chosen_caption=None):
if self.stream.get_by_itag(700):
merge_audio_video(self.title, '2k', 'mp4', download_nonprogressive(self.stream, 700, 140, 'mp4', self.temp_dir), self.captions, chosen_caption)
elif self.stream.get_by_itag(308):
merge_audio_video(self.title, '2k', 'webm', download_nonprogressive(self.stream, 308, 251, 'webm', self.temp_dir), self.captions, chosen_caption)
elif self.stream.get_by_itag(271):
merge_audio_video(self.title, '2k', 'webm', download_nonprogressive(self.stream, 271, 251, 'webm', self.temp_dir), self.captions, chosen_caption)
def _handle_1080p_download(self, chosen_caption=None):
if self.stream.get_by_itag(699):
merge_audio_video(self.title, '1080p', 'mp4', download_nonprogressive(self.stream, 699, 140, 'mp4', self.temp_dir), self.captions, chosen_caption)
elif self.stream.get_by_itag(299):
merge_audio_video(self.title, '1080p', 'mp4', download_nonprogressive(self.stream, 299, 140, 'mp4', self.temp_dir), self.captions, chosen_caption)
elif self.stream.get_by_itag(137):
merge_audio_video(self.title, '1080p', 'mp4', download_nonprogressive(self.stream, 137, 140, 'mp4', self.temp_dir), self.captions, chosen_caption)
def _handle_720p_download(self, chosen_caption=None):
if self.stream.get_by_itag(698):
merge_audio_video(self.title, '720p', 'mp4', download_nonprogressive(self.stream, 698, 140, 'mp4', self.temp_dir), self.captions, chosen_caption)
elif self.stream.get_by_itag(298):
merge_audio_video(self.title, '720p', 'mp4', download_nonprogressive(self.stream, 298, 140, 'mp4', self.temp_dir), self.captions, chosen_caption)
elif self.stream.get_by_itag(136):
merge_audio_video(self.title, '720p', 'mp4', download_nonprogressive(self.stream, 136, 140, 'mp4', self.temp_dir), self.captions, chosen_caption)
def main():
parser = argparse.ArgumentParser(description=f'pytubePP (Pytube Post Processor) v{version} - A Simple CLI Tool to Download Your Favorite YouTube Videos Effortlessly!')
downloader = YouTubeDownloader()
parser = argparse.ArgumentParser(description=f'PytubePP (Pytube Post Processor) v{downloader.version} - A Simple CLI Tool to Download Your Favorite YouTube Videos Effortlessly!')
parser.add_argument('url', nargs='?', default=None, help='url of the youtube video')
parser.add_argument('-df', '--download-folder', default=argparse.SUPPRESS, help='set custom download folder path (default: ~/Downloads/Pytube Downloads) [arg eg: "/path/to/folder"]')
parser.add_argument('-ds', '--default-stream', default=argparse.SUPPRESS, help='set default download stream (default: max) [available arguments: 144p, 240p, 360p, 480p, 720p, 1080p, 1440p, 2160p, mp3, max]')
parser.add_argument('-s', '--stream', default=argparse.SUPPRESS, help='choose download stream for the current video (default: your chosen --default-stream) [available arguments: 144p, 240p, 360p, 480p, 720p, 1080p, 1440p, 2160p, 144, 240, 360, 480, 720, 1080, 1440, 2160, mp3, hd, fhd, 2k, 4k]')
parser.add_argument('-ds', '--default-stream', default=argparse.SUPPRESS, help='set default download stream (default: max) [available arguments: 144p, 240p, 360p, 480p, 720p, 1080p, 1440p, 2160p, 4320p, mp3, max]')
parser.add_argument('-dc', '--default-caption', default=argparse.SUPPRESS, help='set default caption (default: none) [available arguments: all language codes, none]')
parser.add_argument('-s', '--stream', default=argparse.SUPPRESS, help='choose download stream for the current video (default: your chosen --default-stream) [available arguments: 144p, 240p, 360p, 480p, 720p, 1080p, 1440p, 2160p, 4320p, 144, 240, 360, 480, 720, 1080, 1440, 2160, 4320, mp3, hd, fhd, 2k, 4k, 8k]')
parser.add_argument('-c', '--caption', default=argparse.SUPPRESS, help='choose caption to embed for the current video (default: your chosen --default-caption) [available arguments: all language codes, none]')
parser.add_argument('-i', '--show-info', action='store_true', help='show video info (title, author, views and available_streams)')
parser.add_argument('-ls', '--list-stream', action='store_true', help='list all available streams (video, audio, caption) (only for debuging purposes)')
parser.add_argument('-ri', '--raw-info', action='store_true', help='show video info in raw json format')
parser.add_argument('-jp', '--json-prettify', action='store_true', help='show json in prettified indented view')
parser.add_argument('-sc', '--show-config', action='store_true', help='show all current user config settings')
parser.add_argument('-r', '--reset-default', action='store_true', help='reset to default settings (download_folder and default_stream)')
parser.add_argument('-ct', '--clear-temp', action='store_true', help='clear temporary files (audio, video, thumbnail files of the failed, incomplete downloads)')
@@ -402,53 +371,201 @@ def main():
sys.exit(1)
if args.url:
if 'download_folder' in args:
if not is_valid_url(args.url):
print('\nInvalid video link! Please enter a valid video url...!!')
sys.exit()
# Handle warning messages for ignored flags
if hasattr(args, 'download_folder'):
print('\nVideo url supplied! igonering -df flag...!!')
if 'default_stream' in args:
if hasattr(args, 'default_stream'):
print('\nVideo url supplied! ignoreing -ds flag...!!')
if hasattr(args, 'default_caption'):
print('\nVideo url supplied! ignoreing -dc flag...!!')
if args.reset_default:
print('\nVideo url supplied! ignoreing -r flag...!!')
if args.clear_temp:
print('\nVideo url supplied! ignoreing -ct flag...!!')
if args.show_config:
print('\nVideo url supplied! ignoreing -sc flag...!!')
# Handle info display flags
if args.show_info:
show_video_info(args.url)
downloader.show_video_info(args.url)
if args.list_stream:
downloader.show_all_streams(args.url)
if args.raw_info:
downloader.show_raw_info(args.url, args.json_prettify)
if args.json_prettify and not args.raw_info:
print('\nMissing flag! -jp flag must be used with a flag which returns json data...!! (eg: -ri)')
if 'stream' in args:
download_stream(args.url, args.stream)
if 'stream' not in args and not args.show_info:
if set_global_video_info(args.url):
if defaultStream == 'max' and maxres != None:
download_stream(args.url, maxres)
return
if (defaultStream == 'mp3' and stream.get_by_itag(140)) or (defaultStream != 'max' and stream.filter(res=defaultStream)):
download_stream(args.url, defaultStream)
# Handle download cases
if hasattr(args, 'stream') and hasattr(args, 'caption'):
if downloader.set_video_info(args.url):
if (args.caption not in downloader.captions.keys()) and (args.caption != 'none'):
print('\nInvalid caption code or caption not available! Please choose a different caption...!! (use -i to see available captions)')
sys.exit()
elif args.caption == 'none':
downloader.download_stream(args.url, args.stream)
elif args.stream == 'mp3' and downloader.stream.get_by_itag(140):
print(f'\nYou have chosen to download mp3 stream! ( Captioning audio files is not supported )')
answer = input('Do you still want to continue downloading ? [yes/no]\n')
while answer not in ['yes', 'y', 'no', 'n']:
print('Invalid answer! try again...!! answer with: [yes/y/no/n]')
answer = input('Do you still want to continue downloading ? [yes/no]\n')
if answer in ['yes', 'y']:
downloader.download_stream(args.url, args.stream)
else:
if maxres != None:
print(f'\nDefault stream not available! ( Default: {defaultStream} | Available: {maxres} )')
print('Download cancelled! exiting...!!')
else:
downloader.download_stream(args.url, args.stream, args.caption)
elif hasattr(args, 'stream'):
if downloader.set_video_info(args.url):
if downloader.default_caption == 'none':
downloader.download_stream(args.url, args.stream)
elif args.stream == 'mp3' and downloader.stream.get_by_itag(140):
print(f'\nYou have chosen to download mp3 stream! ( Captioning audio files is not supported )')
answer = input('Do you still want to continue downloading ? [yes/no]\n')
while answer not in ['yes', 'y', 'no', 'n']:
print('Invalid answer! try again...!! answer with: [yes/y/no/n]')
answer = input('Do you still want to continue downloading ? [yes/no]\n')
if answer in ['yes', 'y']:
downloader.download_stream(args.url, args.stream)
else:
print('Download cancelled! exiting...!!')
elif downloader.default_caption in downloader.captions.keys():
downloader.download_stream(args.url, args.stream, downloader.default_caption)
else:
print(f'\nDefault caption not available! ( Default: {downloader.default_caption} | Available: {[caption.code for caption in downloader.captions.keys()] or "Nothing"} )')
answer = input('Do you still want to continue downloading without caption? [yes/no]\n')
while answer not in ['yes', 'y', 'no', 'n']:
print('Invalid answer! try again...!! answer with: [yes/y/no/n]')
answer = input('Do you still want to continue downloading without caption? [yes/no]\n')
if answer in ['yes', 'y']:
downloader.download_stream(args.url, args.stream)
else:
print('Download cancelled! exiting...!!')
elif hasattr(args, 'caption'):
if downloader.set_video_info(args.url):
if (args.caption not in downloader.captions.keys()) and (args.caption != 'none'):
print('\nInvalid caption code or caption not available! Please choose a different caption...!! (use -i to see available captions)')
sys.exit()
elif args.caption == 'none':
if downloader.default_stream == 'max' and downloader.maxres:
downloader.download_stream(args.url, downloader.maxres)
elif downloader.default_stream == 'mp3' and downloader.stream.get_by_itag(140):
downloader.download_stream(args.url, downloader.default_stream)
elif downloader.default_stream != 'max' and downloader.stream.filter(res=downloader.default_stream):
downloader.download_stream(args.url, downloader.default_stream)
else:
if downloader.maxres:
print(f'\nDefault stream not available! ( Default: {downloader.default_stream} | Available: {downloader.maxres} )')
answer = input('Do you want to download the maximum available stream ? [yes/no]\n')
while answer not in ['yes', 'y', 'no', 'n']:
print('Invalid answer! try again...!! answer with: [yes/y/no/n]')
answer = input('Do you want to download the maximum available stream ? [yes/no]\n')
if answer in ['yes', 'y']:
download_stream(args.url, maxres)
downloader.download_stream(args.url, downloader.maxres)
else:
print('Download cancelled! exiting...!!')
else:
print('Sorry, No downloadable video stream found....!!!')
elif downloader.default_stream == 'max' and downloader.maxres:
downloader.download_stream(args.url, downloader.maxres, args.caption)
elif downloader.default_stream == 'mp3' and downloader.stream.get_by_itag(140):
print(f'\nDefault stream set to mp3! ( Captioning audio files is not supported )')
answer = input('Do you still want to continue downloading ? [yes/no]\n')
while answer not in ['yes', 'y', 'no', 'n']:
print('Invalid answer! try again...!! answer with: [yes/y/no/n]')
answer = input('Do you still want to continue downloading ? [yes/no]\n')
if answer in ['yes', 'y']:
downloader.download_stream(args.url, downloader.default_stream)
else:
print('Download cancelled! exiting...!!')
elif downloader.default_stream != 'max' and downloader.stream.filter(res=downloader.default_stream):
downloader.download_stream(args.url, downloader.default_stream, args.caption)
else:
if downloader.maxres:
print(f'\nDefault stream not available! ( Default: {downloader.default_stream} | Available: {downloader.maxres} )')
answer = input('Do you want to download the maximum available stream ? [yes/no]\n')
while answer not in ['yes', 'y', 'no', 'n']:
print('Invalid answer! try again...!! answer with: [yes/y/no/n]')
answer = input('Do you want to download the maximum available stream ? [yes/no]\n')
if answer in ['yes', 'y']:
downloader.download_stream(args.url, downloader.maxres, args.caption)
else:
print('Download cancelled! exiting...!!')
else:
print('Sorry, No downloadable video stream found....!!!')
elif not any([args.show_info, args.raw_info, args.json_prettify, args.list_stream]): # If no info flags are set
if downloader.set_video_info(args.url):
if downloader.default_stream == 'max' and downloader.maxres:
if downloader.default_caption == 'none':
downloader.download_stream(args.url, downloader.maxres)
elif downloader.default_caption in downloader.captions.keys():
downloader.download_stream(args.url, downloader.maxres, downloader.default_caption)
else:
print(f'\nDefault caption not available! ( Default: {downloader.default_caption} | Available: {[caption.code for caption in downloader.captions.keys()] or "Nothing"} )')
answer = input('Do you still want to continue downloading without caption? [yes/no]\n')
while answer not in ['yes', 'y', 'no', 'n']:
print('Invalid answer! try again...!! answer with: [yes/y/no/n]')
answer = input('Do you still want to continue downloading without caption? [yes/no]\n')
if answer in ['yes', 'y']:
downloader.download_stream(args.url, downloader.maxres)
else:
print('Download cancelled! exiting...!!')
elif (downloader.default_stream == 'mp3' and downloader.stream.get_by_itag(140)) or (downloader.default_stream != 'max' and downloader.stream.filter(res=downloader.default_stream)):
if downloader.default_caption == 'none':
downloader.download_stream(args.url, downloader.default_stream)
elif downloader.default_stream == 'mp3' and downloader.stream.get_by_itag(140):
print(f'\nDefault stream set to mp3! ( Captioning audio files is not supported )')
answer = input('Do you still want to continue downloading ? [yes/no]\n')
while answer not in ['yes', 'y', 'no', 'n']:
print('Invalid answer! try again...!! answer with: [yes/y/no/n]')
answer = input('Do you still want to continue downloading ? [yes/no]\n')
if answer in ['yes', 'y']:
downloader.download_stream(args.url, downloader.default_stream)
else:
print('Download cancelled! exiting...!!')
elif downloader.default_caption in downloader.captions.keys():
downloader.download_stream(args.url, downloader.default_stream, downloader.default_caption)
else:
print(f'\nDefault caption not available! ( Default: {downloader.default_caption} | Available: {[caption.code for caption in downloader.captions.keys()] or "Nothing"} )')
answer = input('Do you still want to continue downloading without caption? [yes/no]\n')
while answer not in ['yes', 'y', 'no', 'n']:
print('Invalid answer! try again...!! answer with: [yes/y/no/n]')
answer = input('Do you still want to continue downloading without caption? [yes/no]\n')
if answer in ['yes', 'y']:
downloader.download_stream(args.url, downloader.default_stream)
else:
print('Download cancelled! exiting...!!')
else:
if downloader.maxres:
print(f'\nDefault stream not available! ( Default: {downloader.default_stream} | Available: {downloader.maxres} )')
answer = input('Do you want to download the maximum available stream ? [yes/no]\n')
while answer not in ['yes', 'y', 'no', 'n']:
print('Invalid answer! try again...!! answer with: [yes/y/no/n]')
answer = input('Do you want to download the maximum available stream ? [yes/no]\n')
if answer in ['yes', 'y']:
if downloader.default_caption == 'none':
downloader.download_stream(args.url, downloader.maxres)
elif downloader.default_caption in downloader.captions.keys():
downloader.download_stream(args.url, downloader.maxres, downloader.default_caption)
else:
print(f'\nDefault caption not available! ( Default: {downloader.default_caption} | Available: {[caption.code for caption in downloader.captions.keys()] or "Nothing"} )')
answer = input('Do you still want to continue downloading without caption? [yes/no]\n')
while answer not in ['yes', 'y', 'no', 'n']:
print('Invalid answer! try again...!! answer with: [yes/y/no/n]')
answer = input('Do you still want to continue downloading without caption? [yes/no]\n')
if answer in ['yes', 'y']:
downloader.download_stream(args.url, downloader.maxres)
else:
print('Download cancelled! exiting...!!')
else:
print('Sorry, No downloadable video stream found....!!!')
else:
print('\nInvalid video link! Please enter a valid video url...!!')
else:
if 'download_folder' in args:
if args.download_folder != downloadDIR:
if hasattr(args, 'download_folder'):
if args.download_folder != downloader.download_dir:
if os.path.isdir(args.download_folder):
update_config('downloadDIR', args.download_folder)
os.makedirs(args.download_folder, exist_ok=True)
@@ -458,9 +575,9 @@ def main():
else:
print('\nDownload folder path is the same! Not updating...!!')
if 'default_stream' in args:
if args.default_stream != defaultStream:
if args.default_stream in ['144p', '240p', '360p', '480p', '720p', '1080p', '1440p', '2160p', 'mp3', 'max']:
if hasattr(args, 'default_stream'):
if args.default_stream != downloader.default_stream:
if args.default_stream in ['144p', '240p', '360p', '480p', '720p', '1080p', '1440p', '2160p', '4320p', 'mp3', 'max']:
update_config('defaultStream', args.default_stream)
print(f'\nDefault stream updated to: {args.default_stream}')
else:
@@ -468,6 +585,16 @@ def main():
else:
print('\nDefault stream is the same! Not updating...!!')
if hasattr(args, 'default_caption'):
if args.default_caption != downloader.default_caption:
if not all(c.isalpha() or c in '.-' for c in args.default_caption) or len(args.default_caption) > 10:
print('\nInvalid caption code! Only a-z, A-Z, dash (-) and dot (.) are allowed with maximum 10 characters...!!')
else:
update_config('defaultCaption', args.default_caption)
print(f'\nDefault caption updated to: {args.default_caption}')
else:
print('\nDefault caption is the same! Not updating...!!')
if args.reset_default:
reset_config()
@@ -475,17 +602,25 @@ def main():
clear_temp_files()
if args.show_config:
print(f'\ndownloadDIR: {downloadDIR}\ntempDIR: {tempDIR}\nconfigDIR: {configDIR}\ndefaultStream: {defaultStream}\n')
print(f'\ntempDIR: {downloader.temp_dir} (Unchangeable) \nconfigDIR: {downloader.config_dir} (Unchangeable)\ndownloadDIR: {downloader.download_dir}\ndefaultStream: {downloader.default_stream}\ndefaultCaption: {downloader.default_caption}\n')
if args.version:
print(f'\npytubePP (Pytube Post Processor) - version: {version}\n')
print(f'pytubepp {downloader.version}')
if args.show_info:
print('\nNo video url supplied! exiting...!!')
if 'stream' in args:
if args.list_stream:
print('\nNo video url supplied! exiting...!!')
if args.raw_info:
print('\nNo video url supplied! exiting...!!')
if args.json_prettify and not args.raw_info:
print('\nMissing flag! -jp flag must be used with a flag which returns json data...!! (eg: -ri)')
if hasattr(args, 'stream'):
print('\nNo video url supplied! exiting...!!')
if __name__ == "__main__":
main()

114
pytubepp/postprocess.py Normal file
View File

@@ -0,0 +1,114 @@
from mutagen.id3 import ID3, APIC, TIT2, TPE1, TALB
from .config import get_temporary_directory, load_config
from .utils import get_unique_filename, postprocess_cleanup
from .download import download_thumbnail
import os, shutil, ffmpy
userConfig = load_config()
downloadDIR = userConfig['downloadDIR']
tempDIR = get_temporary_directory()
def merge_audio_video(title, resolution, file_extention, random_filename, captions, caption_code=None, tempDIR=tempDIR, downloadDIR=downloadDIR):
video_file = os.path.join(tempDIR, random_filename + '_vdo.' + file_extention)
audio_file = os.path.join(tempDIR, random_filename + '_ado.' + file_extention)
output_temp_file = os.path.join(tempDIR, random_filename + '_merged.' + file_extention)
output_file = os.path.join(downloadDIR, get_unique_filename(title + '_' + resolution + '.' + file_extention)) if not caption_code else os.path.join(downloadDIR, get_unique_filename(title + '_' + resolution + '_' + caption_code + '.' + file_extention))
if caption_code:
print(f'Downloading Caption ({caption_code})...')
caption = captions[caption_code]
srt_file = os.path.join(tempDIR, random_filename + '_cap.srt')
caption.save_captions(srt_file)
vtt_file = os.path.join(tempDIR, random_filename + '_cap.vtt')
print('Processing...')
if file_extention == 'webm':
devnull = open(os.devnull, 'w')
ff_convert = ffmpy.FFmpeg(
inputs={srt_file: None},
outputs={vtt_file: None}
)
ff_convert.run(stdout=devnull, stderr=devnull)
subtitle_file = vtt_file
subtitle_codec = 'webvtt'
else:
subtitle_file = srt_file
subtitle_codec = 'mov_text'
input_params = {video_file: None, audio_file: None}
output_params = {output_temp_file: ['-i', subtitle_file, '-c:v', 'copy', '-c:a', 'copy',
'-c:s', subtitle_codec, '-metadata:s:s:0', f'language={caption_code}',
'-metadata:s:s:0', f'title={caption_code}', '-metadata:s:s:0', f'handler_name={caption_code}']}
devnull = open(os.devnull, 'w')
ff = ffmpy.FFmpeg(inputs=input_params, outputs=output_params)
ff.run(stdout=devnull, stderr=devnull)
devnull.close()
shutil.move(output_temp_file, output_file)
cleanup_files = ['_vdo.' + file_extention, '_ado.' + file_extention, '_cap.srt', '_merged.' + file_extention]
if file_extention == 'webm':
cleanup_files.append('_cap.vtt')
postprocess_cleanup(tempDIR, cleanup_files, random_filename)
print('Done! 🎉')
else:
input_params = {video_file: None, audio_file: None}
output_params = {output_temp_file: ['-c:v', 'copy', '-c:a', 'copy']}
print('Processing...')
devnull = open(os.devnull, 'w')
ff = ffmpy.FFmpeg(inputs=input_params, outputs=output_params)
ff.run(stdout=devnull, stderr=devnull)
devnull.close()
shutil.move(output_temp_file, output_file)
postprocess_cleanup(tempDIR, ['_vdo.' + file_extention, '_ado.' + file_extention, '_merged.' + file_extention], random_filename)
print('Done! 🎉')
def convert_to_mp3(title, thumbnail_url, random_filename, mp3_artist='Unknown', mp3_title='Unknown', mp3_album='Unknown', tempDIR=tempDIR, downloadDIR=downloadDIR):
image_file = os.path.join(tempDIR, random_filename + '_thumbnail.jpg')
download_thumbnail(thumbnail_url, image_file)
audio_file = os.path.join(tempDIR, random_filename + '_ado.mp4')
output_file = os.path.join(downloadDIR, get_unique_filename(title + '_audio.mp3'))
print('Processing...')
devnull = open(os.devnull, 'w')
video_file = os.path.join(tempDIR, random_filename + '_thumbnail.mp4')
ff1 = ffmpy.FFmpeg(
inputs={image_file: '-loop 1 -t 1'},
outputs={video_file: '-vf "scale=1280:720" -r 1 -c:v libx264 -t 1'}
)
ff1.run(stdout=devnull, stderr=devnull)
merged_file = os.path.join(tempDIR, random_filename + '_merged.mp4')
ff2 = ffmpy.FFmpeg(
inputs={video_file: None, audio_file: None},
outputs={merged_file: '-c:v copy -c:a copy'}
)
ff2.run(stdout=devnull, stderr=devnull)
output_temp_file = os.path.join(tempDIR, random_filename + '_merged.mp3')
ff3 = ffmpy.FFmpeg(
inputs={merged_file: None},
outputs={output_temp_file: '-vn -c:a libmp3lame -q:a 2'}
)
ff3.run(stdout=devnull, stderr=devnull)
devnull.close()
audio = ID3(output_temp_file)
audio.add(TIT2(encoding=3, text=mp3_title))
audio.add(TPE1(encoding=3, text=mp3_artist))
audio.add(TALB(encoding=3, text=mp3_album))
with open(image_file, 'rb') as img:
audio.add(APIC(
encoding=3,
mime='image/jpeg',
type=3,
desc=u'Cover',
data=img.read()
))
audio.save()
shutil.move(output_temp_file, output_file)
postprocess_cleanup(tempDIR, ['_thumbnail.jpg', '_thumbnail.mp4', '_ado.mp4', '_merged.mp4'], random_filename)
print('Done! 🎉')

70
pytubepp/utils.py Normal file
View File

@@ -0,0 +1,70 @@
from importlib.metadata import version
from .config import load_config, get_temporary_directory
import os, re, subprocess, platform
userConfig = load_config()
downloadDIR = userConfig['downloadDIR']
tempDIR = get_temporary_directory()
def network_available():
try:
param = '-n' if platform.system().lower() == 'windows' else '-c'
command = ['ping', param, '1', 'youtube.com']
subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
return True
except subprocess.CalledProcessError:
return False
def nodejs_installed():
try:
subprocess.run(['node', '--version'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def ffmpeg_installed():
try:
subprocess.run(['ffmpeg', '-version'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def get_version():
try:
return version('pytubepp')
except Exception as e:
return "Unknown"
def is_valid_url(url):
match = re.search(r"(https?://(?:www\.|music\.)?youtube\.com/watch\?v=[^&]{11}|https?://youtu\.be/[^?&]*(\?si=[^&]*)?)", url)
return match
def get_unique_filename(filename, directory=downloadDIR):
base_name, extension = os.path.splitext(filename)
counter = 1
while os.path.exists(os.path.join(directory, filename)):
filename = f"{base_name} ({counter}){extension}"
counter += 1
return filename
def postprocess_cleanup(dir, files, random_filename):
for file in files:
file_path = os.path.join(dir, random_filename + file)
try:
if os.path.isfile(file_path):
os.remove(file_path)
except Exception as e:
print(e)
def clear_temp_files():
if os.listdir(tempDIR) != []:
for file in os.listdir(tempDIR):
file_path = os.path.join(tempDIR, file)
try:
if os.path.isfile(file_path):
os.remove(file_path)
print(f'Removed: {file}')
except Exception as e:
print(e)
else:
print('No temporary files found to clear...!')

View File

@@ -1,4 +1,4 @@
pytube
pytubefix
requests
ffmpy
mutagen
@@ -6,3 +6,6 @@ tabulate
tqdm
appdirs
setuptools
wheel
twine
build

View File

@@ -1,58 +0,0 @@
from setuptools import setup, find_packages
with open('README.md', 'r', encoding='utf8') as file:
readme = file.read()
setup(
name='pytubepp',
version='1.0.1',
description='A Simple CLI Tool to Download Your Favorite YouTube Videos Effortlessly!',
long_description=readme,
long_description_content_type='text/markdown',
author='Subhamoy Biswas',
author_email='hey@neosubhamoy.com',
license='MIT',
packages=find_packages(),
python_requires=">=3.8",
url="https://github.com/neosubhamoy/pytubepp",
entry_points={
'console_scripts': [
'pytubepp=pytubepp.main:main',
],
},
install_requires=[
'pytube',
'requests',
'ffmpy',
'mutagen',
'tabulate',
'tqdm',
'appdirs',
'setuptools',
],
classifiers=[
"Development Status :: 5 - Production/Stable",
"Environment :: Console",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Natural Language :: English",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python",
"Topic :: Internet",
"Topic :: Multimedia :: Video",
"Topic :: Multimedia :: Sound/Audio :: Conversion",
"Topic :: Multimedia :: Video :: Conversion",
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: Terminals",
"Topic :: Utilities",
],
keywords=["youtube", "download", "video", "pytube", "cli"],
project_urls={
"Bug Reports": "https://github.com/neosubhamoy/pytubepp/issues",
},
)