1
1
mirror of https://github.com/neosubhamoy/pytubepp.git synced 2026-02-04 18:22:23 +05:30

21 Commits

12 changed files with 372 additions and 318 deletions

View File

@@ -18,14 +18,14 @@ jobs:
- name: 🐍 Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.10'
python-version: '3.12'
- name: 📦 Install dependencies
run: |
pip install -r requirements.txt
- name: 🛠️ Build package
run: python3 setup.py sdist bdist_wheel
run: python3 -m build
- name: 🚀 Publish package distributions to PyPI
uses: pypa/gh-action-pypi-publish@release/v1

3
.gitignore vendored
View File

@@ -1,4 +1,5 @@
# Compiled python modules.
# Compiled python modules and cache.
__pycache__/
*.pyc
# Setuptools distribution folder.

View File

View File

@@ -3,8 +3,9 @@
### A Simple CLI Tool to Download Your Favourite YouTube Videos Effortlessly!
[![status](https://img.shields.io/badge/status-active-brightgreen.svg?style=flat)](https://github.com/neosubhamoy/pytubepp/)
[![verion](https://img.shields.io/badge/version-v1.0.7_stable-yellow.svg?style=flat)](https://github.com/neosubhamoy/pytubepp/)
[![python](https://img.shields.io/badge/python-v3.12.x-blue?logo=python&style=flat)](https://www.python.org/downloads/)
[![PypiDownloads](https://img.shields.io/pypi/dm/pytubepp?color=brightgreen)](https://pypi.org/project/pytubepp/)
[![PypiVersion](https://img.shields.io/pypi/v/pytubepp?color=yellow)](https://pypi.org/project/pytubepp/)
[![python](https://img.shields.io/badge/python-v3.13-blue?logo=python&style=flat)](https://www.python.org/downloads/)
[![builds](https://img.shields.io/badge/builds-passing-brightgreen.svg?style=flat)](https://github.com/neosubhamoy/pytubepp/)
[![PRs](https://img.shields.io/badge/PRs-welcome-blue.svg?style=flat)](https://github.com/neosubhamoy/pytubepp/)
@@ -17,9 +18,12 @@
* Smart Stream Selection
* Highly Configurable and Many More 😉
### **🧩 Dependencies**
### **📎 Pre-Requirements**
* [Python](https://www.python.org/downloads/) (>=3.8)
* [FFmpeg](https://ffmpeg.org/)
### **🧩 Python Dependencies**
* [pytubefix](https://pypi.org/project/pytubefix/)
* [FFmpeg (Not Pre-Included)](https://ffmpeg.org/)
* [ffmpy](https://pypi.org/project/ffmpy/)
* [mutagen](https://pypi.org/project/mutagen/)
* [tabulate](https://pypi.org/project/tabulate/)
@@ -29,22 +33,28 @@
* [setuptools](https://pypi.org/project/setuptools/)
### **🛠️ Installation**
You can install PytubePP in your system via PIP by simply running the below command
1. Install Python and PIP
- Linux (Debian): Python is pre-installed install PIP using `sudo apt install python3-pip`<br>
- Linux (Fedora): Python is pre-installed install PIP using `sudo dnf install python3-pip`<br>
- Linux (Arch): Python is pre-installed install PIP using `sudo pacman -S python-pip`<br>
- Windows (10/11): `winget install Python.Python.3.13`<br>
- MacOS (using Homebrew): `brew install python`<br>
- Android (using Termux): `pkg install python`
2. Install FFmpeg
- Linux (Debian): `sudo apt install ffmpeg`<br>
- Linux (Fedora): `sudo dnf install ffmpeg-free`<br>
- Linux (Arch): `sudo pacman -S ffmpeg`<br>
- Windows (10/11): `winget install ffmpeg`<br>
- MacOS (using Homebrew): `brew install ffmpeg`<br>
- Android (using Termux): `pkg install ffmpeg`
3. Install PytubePP (using PIP)
```terminal
pip install pytubepp
```
**IMPORTANT: Before installing PytubePP make sure FFmpeg is installed in your system and accesable via your cli interface (FFmpeg is Must Required as some of the core features of pytubePP relies on FFmpeg, but due to security reasons we can not ship it with the module)**
**>> Install FFmpeg (If you haven't already!)**
Linux (Ubuntu): `sudo apt install ffmpeg`<br>
Linux (Fedora): `sudo dnf install ffmpeg-free`<br>
Windows (10/11): `winget install ffmpeg`<br>
MacOS (using Homebrew): `brew install ffmpeg`<br>
Android (using Termux): `pkg install ffmpeg`
**NOTE: Always make sure 'PytubePP' and 'Pytubefix' is on the latest version to avoid issues (update them at least once a week) (Use the command below to update)**
```
pip install pytubefix pytubepp --upgrade
```

60
pyproject.toml Normal file
View File

@@ -0,0 +1,60 @@
[build-system]
requires = ["setuptools>=67.4.0"]
build-backend = "setuptools.build_meta"
[project]
name = "pytubepp"
version = "1.1.2"
authors = [
{ name="Subhamoy Biswas", email="hey@neosubhamoy.com" },
]
description = "A Simple CLI Tool to Download Your Favorite YouTube Videos Effortlessly!"
readme = "README.md"
requires-python = ">=3.8"
license = {text = "MIT"}
keywords = ["youtube", "download", "video", "pytube", "cli"]
classifiers = [
"Development Status :: 5 - Production/Stable",
"Environment :: Console",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Natural Language :: English",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python",
"Topic :: Internet",
"Topic :: Multimedia :: Video",
"Topic :: Multimedia :: Sound/Audio :: Conversion",
"Topic :: Multimedia :: Video :: Conversion",
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: Terminals",
"Topic :: Utilities",
]
dependencies = [
"pytubefix",
"requests",
"ffmpy",
"mutagen",
"tabulate",
"tqdm",
"appdirs",
"setuptools",
]
[project.scripts]
pytubepp = "pytubepp.main:main"
[project.urls]
"Homepage" = "https://github.com/neosubhamoy/pytubepp"
"Bug Reports" = "https://github.com/neosubhamoy/pytubepp/issues"
[tool.setuptools.packages.find]
include = ["pytubepp*"]
[tool.setuptools]
license-files = ["LICENSE"]

53
pytubepp/config.py Normal file
View File

@@ -0,0 +1,53 @@
import os, json, platform, appdirs, tempfile
def get_download_folder():
system = platform.system()
if system in ["Windows", "Darwin", "Linux"]:
cli_download_dir = os.path.join(os.path.expanduser("~"), "Downloads", "PytubePP Downloads")
os.makedirs(cli_download_dir, exist_ok=True)
return cli_download_dir
else:
cli_download_dir = os.path.join(appdirs.user_download_dir(), "PytubePP Downloads")
os.makedirs(cli_download_dir, exist_ok=True)
return cli_download_dir
DEFAULT_CONFIG = {
'downloadDIR': get_download_folder(),
'defaultStream': 'max',
}
def get_temporary_directory():
temp_dir = tempfile.gettempdir()
cli_temp_dir = os.path.join(temp_dir, 'pytubepp')
os.makedirs(cli_temp_dir, exist_ok=True)
return cli_temp_dir
def load_config():
config_dir = appdirs.user_config_dir('pytubepp')
config_path = os.path.join(config_dir, 'config.json')
if os.path.exists(config_path):
with open(config_path, 'r') as f:
return json.load(f)
else:
return DEFAULT_CONFIG
def save_config(config):
config_dir = appdirs.user_config_dir('pytubepp')
os.makedirs(config_dir, exist_ok=True)
config_path = os.path.join(config_dir, 'config.json')
with open(config_path, 'w') as f:
json.dump(config, f, indent=4)
def update_config(key, value):
config = load_config()
config[key] = value
save_config(config)
def reset_config():
config_dir = appdirs.user_config_dir('pytubepp')
config_path = os.path.join(config_dir, 'config.json')
if os.path.exists(config_path):
os.remove(config_path)
print('\nConfig reset successful!')
else:
print('\nAlready using the default configs! Not resetting...!')

68
pytubepp/download.py Normal file
View File

@@ -0,0 +1,68 @@
from tqdm import tqdm
from .config import get_temporary_directory, load_config
from .utils import get_unique_filename
import os, re, requests, shutil, sys, random
userConfig = load_config()
downloadDIR = userConfig['downloadDIR']
tempDIR = get_temporary_directory()
def download_progressive(stream, itag, title, resolution, file_extention, tempDIR=tempDIR, downloadDIR=downloadDIR):
global total_filesize, progress_bar
selected_vdo = stream.get_by_itag(itag)
total_filesize = selected_vdo.filesize
progress_bar = tqdm(total=total_filesize, unit='B', unit_scale=True, desc="Downloading Video+Audio")
random_filename = str(random.randint(1000000000, 9999999999))
filename = random_filename + '_vdo.' + file_extention
output_temp_file = os.path.join(tempDIR, filename)
output_file = os.path.join(downloadDIR, get_unique_filename(title + '_' + resolution + '.' + file_extention))
selected_vdo.download(output_path=tempDIR, filename=filename)
print('Processing...')
shutil.move(output_temp_file, output_file)
print('Done! 🎉')
def download_nonprogressive(stream, itag_vdo, itag_ado, file_extention, output_path):
global total_filesize, progress_bar
selected_vdo = stream.get_by_itag(itag_vdo)
selected_ado = stream.get_by_itag(itag_ado)
random_filename = str(random.randint(1000000000, 9999999999))
total_filesize = selected_vdo.filesize
progress_bar = tqdm(total=total_filesize, unit='B', unit_scale=True, desc="Downloading Video")
selected_vdo.download(output_path=output_path, filename=random_filename + '_vdo.' + file_extention)
total_filesize = selected_ado.filesize
progress_bar = tqdm(total=total_filesize, unit='B', unit_scale=True, desc="Downloading Audio")
selected_ado.download(output_path=output_path, filename=random_filename + '_ado.' + file_extention)
return random_filename
def download_audio(stream, itag, output_path):
global total_filesize, progress_bar
selected_ado = stream.get_by_itag(itag)
total_filesize = selected_ado.filesize
progress_bar = tqdm(total=total_filesize, unit='B', unit_scale=True, desc="Downloading Audio")
random_filename = str(random.randint(1000000000, 9999999999))
selected_ado.download(output_path=output_path, filename=random_filename + '_ado.mp4')
return random_filename
def download_thumbnail(url, file_path):
print('Downloading thumbnail...')
maxres_url = re.sub(r'/[^/]*\.jpg.*$', '/maxresdefault.jpg', url)
hq_url = re.sub(r'/[^/]*\.jpg.*$', '/hqdefault.jpg', url)
response = requests.get(maxres_url, stream=True)
if response.status_code != 200:
response = requests.get(hq_url, stream=True)
if response.status_code == 200:
with open(file_path, 'wb') as file:
response.raw.decode_content = True
shutil.copyfileobj(response.raw, file)
else:
print('Failed to download thumbnail...!')
sys.exit()
def progress(chunk, file_handle, bytes_remaining):
chunk_size = total_filesize - bytes_remaining
progress_bar.update(chunk_size - progress_bar.n)
if bytes_remaining == 0:
progress_bar.close()

View File

@@ -1,76 +1,10 @@
from pytubefix import YouTube
from mutagen.id3 import ID3, APIC, TIT2, TPE1, TALB
from tabulate import tabulate
from tqdm import tqdm
from importlib.metadata import version
import appdirs, ffmpy, requests, re, os, sys, random, shutil, platform, json, argparse, tempfile, subprocess
def network_available():
try:
param = '-n' if platform.system().lower() == 'windows' else '-c'
command = ['ping', param, '1', 'youtube.com']
subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
return True
except subprocess.CalledProcessError:
return False
def get_version():
try:
return version('pytubepp')
except Exception as e:
return "Unknown"
def get_download_folder():
system = platform.system()
if system in ["Windows", "Darwin", "Linux"]:
cli_download_dir = os.path.join(os.path.expanduser("~"), "Downloads", "PytubePP Downloads")
os.makedirs(cli_download_dir, exist_ok=True)
return cli_download_dir
else:
cli_download_dir = os.path.join(appdirs.user_download_dir(), "PytubePP Downloads")
os.makedirs(cli_download_dir, exist_ok=True)
return cli_download_dir
def get_temporary_directory():
temp_dir = tempfile.gettempdir()
cli_temp_dir = os.path.join(temp_dir, 'pytubepp')
os.makedirs(cli_temp_dir, exist_ok=True)
return cli_temp_dir
DEFAULT_CONFIG = {
'downloadDIR': get_download_folder(),
'defaultStream': 'max',
}
def load_config():
config_dir = appdirs.user_config_dir('pytubepp')
config_path = os.path.join(config_dir, 'config.json')
if os.path.exists(config_path):
with open(config_path, 'r') as f:
return json.load(f)
else:
return DEFAULT_CONFIG
def save_config(config):
config_dir = appdirs.user_config_dir('pytubepp')
os.makedirs(config_dir, exist_ok=True)
config_path = os.path.join(config_dir, 'config.json')
with open(config_path, 'w') as f:
json.dump(config, f, indent=4)
def update_config(key, value):
config = load_config()
config[key] = value
save_config(config)
def reset_config():
config_dir = appdirs.user_config_dir('pytubepp')
config_path = os.path.join(config_dir, 'config.json')
if os.path.exists(config_path):
os.remove(config_path)
print('\nConfig reset successful!')
else:
print('\nAlready using the default configs! Not resetting...!')
from .config import get_temporary_directory, load_config, update_config, reset_config
from .download import download_progressive, download_nonprogressive, download_audio, progress
from .postprocess import merge_audio_video, convert_to_mp3
from .utils import get_version, clear_temp_files, is_valid_url, network_available
import appdirs, os, re, sys, argparse, json
userConfig = load_config()
downloadDIR = userConfig['downloadDIR']
@@ -79,167 +13,6 @@ configDIR = appdirs.user_config_dir('pytubepp')
defaultStream = userConfig['defaultStream']
version = get_version()
def merge_audio_video(title, resolution, file_extention, random_filename, tempDIR=tempDIR, downloadDIR=downloadDIR):
video_file = os.path.join(tempDIR, random_filename + '_vdo.' + file_extention)
audio_file = os.path.join(tempDIR, random_filename + '_ado.' + file_extention)
output_temp_file = os.path.join(tempDIR, random_filename + '_merged.' + file_extention)
output_file = os.path.join(downloadDIR, get_unique_filename(title + '_' + resolution + '.' + file_extention))
input_params = {video_file: None, audio_file: None}
output_params = {output_temp_file: ['-c:v', 'copy', '-c:a', 'copy']}
print('Processing...')
devnull = open(os.devnull, 'w')
ff = ffmpy.FFmpeg(inputs=input_params, outputs=output_params)
ff.run(stdout=devnull, stderr=devnull)
devnull.close()
shutil.move(output_temp_file, output_file)
postprocess_cleanup(tempDIR, ['_vdo.' + file_extention, '_ado.' + file_extention, '_merged.' + file_extention], random_filename)
print('Done! 🎉')
def convert_to_mp3(title, thumbnail_url, random_filename, mp3_artist='Unknown', mp3_title='Unknown', mp3_album='Unknown', tempDIR=tempDIR, downloadDIR=downloadDIR):
image_file = os.path.join(tempDIR, random_filename + '_thumbnail.jpg')
download_thumbnail(thumbnail_url, image_file)
audio_file = os.path.join(tempDIR, random_filename + '_ado.mp4')
output_file = os.path.join(downloadDIR, get_unique_filename(title + '_audio.mp3'))
print('Processing...')
devnull = open(os.devnull, 'w')
video_file = os.path.join(tempDIR, random_filename + '_thumbnail.mp4')
ff1 = ffmpy.FFmpeg(
inputs={image_file: '-loop 1 -t 1'},
outputs={video_file: '-vf "scale=1280:720" -r 1 -c:v libx264 -t 1'}
)
ff1.run(stdout=devnull, stderr=devnull)
merged_file = os.path.join(tempDIR, random_filename + '_merged.mp4')
ff2 = ffmpy.FFmpeg(
inputs={video_file: None, audio_file: None},
outputs={merged_file: '-c:v copy -c:a copy'}
)
ff2.run(stdout=devnull, stderr=devnull)
output_temp_file = os.path.join(tempDIR, random_filename + '_merged.mp3')
ff3 = ffmpy.FFmpeg(
inputs={merged_file: None},
outputs={output_temp_file: '-vn -c:a libmp3lame -q:a 2'}
)
ff3.run(stdout=devnull, stderr=devnull)
devnull.close()
audio = ID3(output_temp_file)
audio.add(TIT2(encoding=3, text=mp3_title))
audio.add(TPE1(encoding=3, text=mp3_artist))
audio.add(TALB(encoding=3, text=mp3_album))
with open(image_file, 'rb') as img:
audio.add(APIC(
encoding=3,
mime='image/jpeg',
type=3,
desc=u'Cover',
data=img.read()
))
audio.save()
shutil.move(output_temp_file, output_file)
postprocess_cleanup(tempDIR, ['_thumbnail.jpg', '_thumbnail.mp4', '_ado.mp4', '_merged.mp4'], random_filename)
print('Done! 🎉')
def download_progressive(stream, itag, title, resolution, file_extention, tempDIR=tempDIR, downloadDIR=downloadDIR):
global total_filesize, progress_bar
selected_vdo = stream.get_by_itag(itag)
total_filesize = selected_vdo.filesize
progress_bar = tqdm(total=total_filesize, unit='B', unit_scale=True, desc="Downloading Video+Audio")
random_filename = str(random.randint(1000000000, 9999999999))
filename = random_filename + '_vdo.' + file_extention
output_temp_file = os.path.join(tempDIR, filename)
output_file = os.path.join(downloadDIR, get_unique_filename(title + '_' + resolution + '.' + file_extention))
selected_vdo.download(output_path=tempDIR, filename=filename)
print('Processing...')
shutil.move(output_temp_file, output_file)
print('Done! 🎉')
def download_nonprogressive(stream, itag_vdo, itag_ado, file_extention, output_path):
global total_filesize, progress_bar
selected_vdo = stream.get_by_itag(itag_vdo)
selected_ado = stream.get_by_itag(itag_ado)
random_filename = str(random.randint(1000000000, 9999999999))
total_filesize = selected_vdo.filesize
progress_bar = tqdm(total=total_filesize, unit='B', unit_scale=True, desc="Downloading Video")
selected_vdo.download(output_path=output_path, filename=random_filename + '_vdo.' + file_extention)
total_filesize = selected_ado.filesize
progress_bar = tqdm(total=total_filesize, unit='B', unit_scale=True, desc="Downloading Audio")
selected_ado.download(output_path=output_path, filename=random_filename + '_ado.' + file_extention)
return random_filename
def download_audio(stream, itag, output_path):
global total_filesize, progress_bar
selected_ado = stream.get_by_itag(itag)
total_filesize = selected_ado.filesize
progress_bar = tqdm(total=total_filesize, unit='B', unit_scale=True, desc="Downloading Audio")
random_filename = str(random.randint(1000000000, 9999999999))
selected_ado.download(output_path=output_path, filename=random_filename + '_ado.mp4')
return random_filename
def download_thumbnail(url, file_path):
print('Downloading thumbnail...')
maxres_url = re.sub(r'/[^/]*\.jpg.*$', '/maxresdefault.jpg', url)
hq_url = re.sub(r'/[^/]*\.jpg.*$', '/hqdefault.jpg', url)
response = requests.get(maxres_url, stream=True)
if response.status_code != 200:
response = requests.get(hq_url, stream=True)
if response.status_code == 200:
with open(file_path, 'wb') as file:
response.raw.decode_content = True
shutil.copyfileobj(response.raw, file)
else:
print('Failed to download thumbnail...!')
sys.exit()
def progress(chunk, file_handle, bytes_remaining):
chunk_size = total_filesize - bytes_remaining
progress_bar.update(chunk_size - progress_bar.n)
if bytes_remaining == 0:
progress_bar.close()
def postprocess_cleanup(dir, files, random_filename):
for file in files:
file_path = os.path.join(dir, random_filename + file)
try:
if os.path.isfile(file_path):
os.remove(file_path)
except Exception as e:
print(e)
def clear_temp_files():
if os.listdir(tempDIR) != []:
for file in os.listdir(tempDIR):
file_path = os.path.join(tempDIR, file)
try:
if os.path.isfile(file_path):
os.remove(file_path)
print(f'Removed: {file}')
except Exception as e:
print(e)
else:
print('No temporary files found to clear...!')
def get_unique_filename(filename, directory=downloadDIR):
base_name, extension = os.path.splitext(filename)
counter = 1
while os.path.exists(os.path.join(directory, filename)):
filename = f"{base_name} ({counter}){extension}"
counter += 1
return filename
def is_valid_url(url):
match = re.search(r"(https?://(?:www\.|music\.)?youtube\.com/watch\?v=[^&]{11}|https?://youtu\.be/[^?&]*(\?si=[^&]*)?)", url)
return match
def set_global_video_info(link):
if not network_available():
print('\nRequest timeout! Please check your network and try again...!!')
@@ -248,7 +21,7 @@ def set_global_video_info(link):
if is_valid_url(link):
global video, author, title, thumbnail, views, stream, stream_resolutions, maxres
link = is_valid_url(link).group(1)
video = YouTube(link, 'ANDROID_VR', on_progress_callback=progress)
video = YouTube(link, on_progress_callback=progress)
author = video.author
title = re.sub(r'[\\/*?:"<>|]', '_', author + ' - ' + video.title)
thumbnail = video.thumbnail_url
@@ -435,7 +208,7 @@ def show_video_info(link):
print('Sorry, No video streams found....!!!')
sys.exit()
print(f'\nTitle: {video.title}\nAuthor: {author}\nPublished On: {video.publish_date.strftime('%d/%m/%Y')}\nDuration: {video.length}\nViews: {views}\n')
print(f'\nTitle: {video.title}\nAuthor: {author}\nPublished On: {video.publish_date.strftime("%d/%m/%Y")}\nDuration: {video.length}\nViews: {views}\n')
print(tabulate(table, headers=['Stream', 'Alias (for -s flag)', 'Format', 'Size', 'FrameRate', 'V-Codec', 'A-Codec', 'V-BitRate', 'A-BitRate']))
print('\n')
else:
@@ -464,10 +237,11 @@ def show_raw_info(link, prettify=False):
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
is_hdr = True if matching_stream.itag == 702 else False
if res == '2160p':
resolution = '2160p'
if stream.get_by_itag(701):
itag = '701'
itag = 701
type = stream.get_by_itag(701).mime_type
filesize = stream.get_by_itag(701).filesize + stream.get_by_itag(140).filesize
fps = stream.get_by_itag(701).fps
@@ -475,6 +249,7 @@ def show_raw_info(link, prettify=False):
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{stream.get_by_itag(701).bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
is_hdr = True
else:
itag = matching_stream.itag
type = matching_stream.mime_type
@@ -484,10 +259,11 @@ def show_raw_info(link, prettify=False):
ado_codec = stream.get_by_itag(251).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(251).abr
is_hdr = False
elif res == '1440p':
resolution = '1440p'
if stream.get_by_itag(700):
itag = '700'
itag = 700
type = stream.get_by_itag(700).mime_type
filesize = stream.get_by_itag(700).filesize + stream.get_by_itag(140).filesize
fps = stream.get_by_itag(700).fps
@@ -495,6 +271,7 @@ def show_raw_info(link, prettify=False):
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{stream.get_by_itag(700).bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
is_hdr = True
else:
itag = matching_stream.itag
type = matching_stream.mime_type
@@ -504,10 +281,11 @@ def show_raw_info(link, prettify=False):
ado_codec = stream.get_by_itag(251).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(251).abr
is_hdr = False
elif res == '1080p':
resolution = '1080p'
if stream.get_by_itag(699):
itag = '699'
itag = 699
type = stream.get_by_itag(699).mime_type
filesize = stream.get_by_itag(699).filesize + stream.get_by_itag(140).filesize
fps = stream.get_by_itag(699).fps
@@ -515,6 +293,7 @@ def show_raw_info(link, prettify=False):
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{stream.get_by_itag(699).bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
is_hdr = True
else:
itag = matching_stream.itag
type = matching_stream.mime_type
@@ -524,10 +303,11 @@ def show_raw_info(link, prettify=False):
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
is_hdr = False
elif res == '720p':
resolution = '720p'
if stream.get_by_itag(698):
itag = '698'
itag = 698
type = stream.get_by_itag(698).mime_type
filesize = stream.get_by_itag(698).filesize + stream.get_by_itag(140).filesize
fps = stream.get_by_itag(698).fps
@@ -535,6 +315,7 @@ def show_raw_info(link, prettify=False):
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{stream.get_by_itag(698).bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
is_hdr = True
else:
itag = matching_stream.itag
type = matching_stream.mime_type
@@ -544,6 +325,7 @@ def show_raw_info(link, prettify=False):
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
is_hdr = False
elif res == '480p':
itag = matching_stream.itag
resolution = '480p'
@@ -554,6 +336,7 @@ def show_raw_info(link, prettify=False):
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
is_hdr = False
elif res == '360p':
itag = matching_stream.itag
resolution = '360p'
@@ -564,6 +347,7 @@ def show_raw_info(link, prettify=False):
ado_codec = matching_stream.audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = matching_stream.abr
is_hdr = False
elif res in ['240p', '144p']:
itag = matching_stream.itag
resolution = res
@@ -574,6 +358,7 @@ def show_raw_info(link, prettify=False):
ado_codec = stream.get_by_itag(139).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(139).abr
is_hdr = False
elif res == 'mp3':
itag = matching_stream.itag
resolution = 'mp3'
@@ -584,6 +369,7 @@ def show_raw_info(link, prettify=False):
ado_codec = matching_stream.audio_codec
vdo_bitrate = None
ado_bitrate = matching_stream.abr
is_hdr = False
else:
filesize = "N/A"
@@ -596,7 +382,8 @@ def show_raw_info(link, prettify=False):
'vcodec': vdo_codec,
'acodec': ado_codec,
'vbitrate': vdo_bitrate,
'abitrate': ado_bitrate
'abitrate': ado_bitrate,
'is_hdr': is_hdr
}
streams_list.append(current_stream)
@@ -606,6 +393,7 @@ def show_raw_info(link, prettify=False):
if prettify:
print(json.dumps({
'id': video.video_id,
'title': video.title,
'author': author,
'thumbnail_url': thumbnail,
@@ -616,6 +404,7 @@ def show_raw_info(link, prettify=False):
}, indent=4))
else:
print(json.dumps({
'id': video.video_id,
'title': video.title,
'author': author,
'thumbnail_url': thumbnail,
@@ -724,9 +513,8 @@ def download_stream(link, chosen_stream):
else:
print('\nInvalid video link! Please enter a valid video url...!!')
def main():
parser = argparse.ArgumentParser(description=f'pytubePP (Pytube Post Processor) v{version} - A Simple CLI Tool to Download Your Favorite YouTube Videos Effortlessly!')
parser = argparse.ArgumentParser(description=f'PytubePP (Pytube Post Processor) v{version} - A Simple CLI Tool to Download Your Favorite YouTube Videos Effortlessly!')
parser.add_argument('url', nargs='?', default=None, help='url of the youtube video')
parser.add_argument('-df', '--download-folder', default=argparse.SUPPRESS, help='set custom download folder path (default: ~/Downloads/Pytube Downloads) [arg eg: "/path/to/folder"]')
parser.add_argument('-ds', '--default-stream', default=argparse.SUPPRESS, help='set default download stream (default: max) [available arguments: 144p, 240p, 360p, 480p, 720p, 1080p, 1440p, 2160p, 4320p, mp3, max]')

76
pytubepp/postprocess.py Normal file
View File

@@ -0,0 +1,76 @@
from mutagen.id3 import ID3, APIC, TIT2, TPE1, TALB
from .config import get_temporary_directory, load_config
from .utils import get_unique_filename, postprocess_cleanup
from .download import download_thumbnail
import os, shutil, ffmpy
userConfig = load_config()
downloadDIR = userConfig['downloadDIR']
tempDIR = get_temporary_directory()
def merge_audio_video(title, resolution, file_extention, random_filename, tempDIR=tempDIR, downloadDIR=downloadDIR):
video_file = os.path.join(tempDIR, random_filename + '_vdo.' + file_extention)
audio_file = os.path.join(tempDIR, random_filename + '_ado.' + file_extention)
output_temp_file = os.path.join(tempDIR, random_filename + '_merged.' + file_extention)
output_file = os.path.join(downloadDIR, get_unique_filename(title + '_' + resolution + '.' + file_extention))
input_params = {video_file: None, audio_file: None}
output_params = {output_temp_file: ['-c:v', 'copy', '-c:a', 'copy']}
print('Processing...')
devnull = open(os.devnull, 'w')
ff = ffmpy.FFmpeg(inputs=input_params, outputs=output_params)
ff.run(stdout=devnull, stderr=devnull)
devnull.close()
shutil.move(output_temp_file, output_file)
postprocess_cleanup(tempDIR, ['_vdo.' + file_extention, '_ado.' + file_extention, '_merged.' + file_extention], random_filename)
print('Done! 🎉')
def convert_to_mp3(title, thumbnail_url, random_filename, mp3_artist='Unknown', mp3_title='Unknown', mp3_album='Unknown', tempDIR=tempDIR, downloadDIR=downloadDIR):
image_file = os.path.join(tempDIR, random_filename + '_thumbnail.jpg')
download_thumbnail(thumbnail_url, image_file)
audio_file = os.path.join(tempDIR, random_filename + '_ado.mp4')
output_file = os.path.join(downloadDIR, get_unique_filename(title + '_audio.mp3'))
print('Processing...')
devnull = open(os.devnull, 'w')
video_file = os.path.join(tempDIR, random_filename + '_thumbnail.mp4')
ff1 = ffmpy.FFmpeg(
inputs={image_file: '-loop 1 -t 1'},
outputs={video_file: '-vf "scale=1280:720" -r 1 -c:v libx264 -t 1'}
)
ff1.run(stdout=devnull, stderr=devnull)
merged_file = os.path.join(tempDIR, random_filename + '_merged.mp4')
ff2 = ffmpy.FFmpeg(
inputs={video_file: None, audio_file: None},
outputs={merged_file: '-c:v copy -c:a copy'}
)
ff2.run(stdout=devnull, stderr=devnull)
output_temp_file = os.path.join(tempDIR, random_filename + '_merged.mp3')
ff3 = ffmpy.FFmpeg(
inputs={merged_file: None},
outputs={output_temp_file: '-vn -c:a libmp3lame -q:a 2'}
)
ff3.run(stdout=devnull, stderr=devnull)
devnull.close()
audio = ID3(output_temp_file)
audio.add(TIT2(encoding=3, text=mp3_title))
audio.add(TPE1(encoding=3, text=mp3_artist))
audio.add(TALB(encoding=3, text=mp3_album))
with open(image_file, 'rb') as img:
audio.add(APIC(
encoding=3,
mime='image/jpeg',
type=3,
desc=u'Cover',
data=img.read()
))
audio.save()
shutil.move(output_temp_file, output_file)
postprocess_cleanup(tempDIR, ['_thumbnail.jpg', '_thumbnail.mp4', '_ado.mp4', '_merged.mp4'], random_filename)
print('Done! 🎉')

56
pytubepp/utils.py Normal file
View File

@@ -0,0 +1,56 @@
from importlib.metadata import version
from .config import load_config, get_temporary_directory
import os, re, subprocess, platform
userConfig = load_config()
downloadDIR = userConfig['downloadDIR']
tempDIR = get_temporary_directory()
def network_available():
try:
param = '-n' if platform.system().lower() == 'windows' else '-c'
command = ['ping', param, '1', 'youtube.com']
subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
return True
except subprocess.CalledProcessError:
return False
def get_version():
try:
return version('pytubepp')
except Exception as e:
return "Unknown"
def is_valid_url(url):
match = re.search(r"(https?://(?:www\.|music\.)?youtube\.com/watch\?v=[^&]{11}|https?://youtu\.be/[^?&]*(\?si=[^&]*)?)", url)
return match
def get_unique_filename(filename, directory=downloadDIR):
base_name, extension = os.path.splitext(filename)
counter = 1
while os.path.exists(os.path.join(directory, filename)):
filename = f"{base_name} ({counter}){extension}"
counter += 1
return filename
def postprocess_cleanup(dir, files, random_filename):
for file in files:
file_path = os.path.join(dir, random_filename + file)
try:
if os.path.isfile(file_path):
os.remove(file_path)
except Exception as e:
print(e)
def clear_temp_files():
if os.listdir(tempDIR) != []:
for file in os.listdir(tempDIR):
file_path = os.path.join(tempDIR, file)
try:
if os.path.isfile(file_path):
os.remove(file_path)
print(f'Removed: {file}')
except Exception as e:
print(e)
else:
print('No temporary files found to clear...!')

View File

@@ -7,4 +7,5 @@ tqdm
appdirs
setuptools
wheel
twine
twine
build

View File

@@ -1,59 +0,0 @@
from setuptools import setup, find_packages
with open('README.md', 'r', encoding='utf8') as file:
readme = file.read()
setup(
name='pytubepp',
version='1.0.7',
description='A Simple CLI Tool to Download Your Favorite YouTube Videos Effortlessly!',
long_description=readme,
long_description_content_type='text/markdown',
author='Subhamoy Biswas',
author_email='hey@neosubhamoy.com',
license='MIT',
packages=find_packages(),
python_requires=">=3.8",
url="https://github.com/neosubhamoy/pytubepp",
entry_points={
'console_scripts': [
'pytubepp=pytubepp.main:main',
],
},
install_requires=[
'pytubefix',
'requests',
'ffmpy',
'mutagen',
'tabulate',
'tqdm',
'appdirs',
'setuptools',
],
classifiers=[
"Development Status :: 5 - Production/Stable",
"Environment :: Console",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Natural Language :: English",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python",
"Topic :: Internet",
"Topic :: Multimedia :: Video",
"Topic :: Multimedia :: Sound/Audio :: Conversion",
"Topic :: Multimedia :: Video :: Conversion",
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: Terminals",
"Topic :: Utilities",
],
keywords=["youtube", "download", "video", "pytube", "cli"],
project_urls={
"Bug Reports": "https://github.com/neosubhamoy/pytubepp/issues",
},
)