1
1
mirror of https://github.com/neosubhamoy/pytubepp.git synced 2025-12-20 01:09:36 +05:30

(refactor): optimized algorithms with OOP approach

This commit is contained in:
2025-01-05 20:59:20 +05:30
parent 8a34e8aef4
commit b1b0a85108

View File

@@ -6,515 +6,291 @@ from .postprocess import merge_audio_video, convert_to_mp3
from .utils import get_version, clear_temp_files, is_valid_url, network_available
import appdirs, os, re, sys, argparse, json
userConfig = load_config()
downloadDIR = userConfig['downloadDIR']
tempDIR = get_temporary_directory()
configDIR = appdirs.user_config_dir('pytubepp')
defaultStream = userConfig['defaultStream']
version = get_version()
class YouTubeDownloader:
def __init__(self):
self.user_config = load_config()
self.download_dir = self.user_config['downloadDIR']
self.temp_dir = get_temporary_directory()
self.config_dir = appdirs.user_config_dir('pytubepp')
self.default_stream = self.user_config['defaultStream']
self.version = get_version()
def set_global_video_info(link):
# Video attributes
self.video = None
self.author = None
self.title = None
self.thumbnail = None
self.views = None
self.stream = None
self.maxres = None
self.stream_resolutions = {
'4320p': {'allowed_streams': ['8k', '4320', '4320p'], 'message': ['4320p', '[8k, 4320, 4320p]']},
'2160p': {'allowed_streams': ['4k', '2160', '2160p'], 'message': ['2160p', '[4k, 2160, 2160p]']},
'1440p': {'allowed_streams': ['2k', '1440', '1440p'], 'message': ['1440p', '[2k, 1440, 1440p]']},
'1080p': {'allowed_streams': ['fhd', '1080', '1080p'], 'message': ['1080p', '[fhd, 1080, 1080p]']},
'720p': {'allowed_streams': ['hd', '720', '720p'], 'message': ['720p', '[hd, 720, 720p]']},
'480p': {'allowed_streams': ['480', '480p'], 'message': ['480p', '[480, 480p]']},
'360p': {'allowed_streams': ['360', '360p'], 'message': ['360p', '[360, 360p]']},
'240p': {'allowed_streams': ['240', '240p'], 'message': ['240p', '[240, 240p]']},
'144p': {'allowed_streams': ['144', '144p'], 'message': ['144p', '[144, 144p]']},
'mp3': {'allowed_streams': ['mp3'], 'message': ['mp3', '[mp3]']}
}
def set_video_info(self, link):
if not network_available():
print('\nRequest timeout! Please check your network and try again...!!')
sys.exit()
if is_valid_url(link):
global video, author, title, thumbnail, views, stream, stream_resolutions, maxres
link = is_valid_url(link).group(1)
video = YouTube(link, on_progress_callback=progress)
author = video.author
title = re.sub(r'[\\/*?:"<>|]', '_', author + ' - ' + video.title)
thumbnail = video.thumbnail_url
views = str(video.views)
stream = video.streams
stream_resolutions = {
'4320p': {
'allowed_streams': ['8k', '4320', '4320p'],
'message': ['4320p', '[8k, 4320, 4320p]']
},
'2160p': {
'allowed_streams': ['4k', '2160', '2160p'],
'message': ['2160p', '[4k, 2160, 2160p]']
},
'1440p': {
'allowed_streams': ['2k', '1440', '1440p'],
'message': ['1440p', '[2k, 1440, 1440p]']
},
'1080p': {
'allowed_streams': ['fhd', '1080', '1080p'],
'message': ['1080p', '[fhd, 1080, 1080p]']
},
'720p': {
'allowed_streams': ['hd', '720', '720p'],
'message': ['720p', '[hd, 720, 720p]']
},
'480p': {
'allowed_streams': ['480', '480p'],
'message': ['480p', '[480, 480p]']
},
'360p': {
'allowed_streams': ['360', '360p'],
'message': ['360p', '[360, 360p]']
},
'240p': {
'allowed_streams': ['240', '240p'],
'message': ['240p', '[240, 240p]']
},
'144p': {
'allowed_streams': ['144', '144p'],
'message': ['144p', '[144, 144p]']
},
'mp3': {
'allowed_streams': ['mp3'],
'message': ['mp3', '[mp3]']
}
}
for res in stream_resolutions.keys():
if res != 'mp3' and stream.filter(res=res):
maxres = res
self.video = YouTube(link, on_progress_callback=progress)
self.author = self.video.author
self.title = re.sub(r'[\\/*?:"<>|]', '_', self.author + ' - ' + self.video.title)
self.thumbnail = self.video.thumbnail_url
self.views = str(self.video.views)
self.stream = self.video.streams
# Find maximum resolution
for res in self.stream_resolutions.keys():
if res != 'mp3' and self.stream.filter(res=res):
self.maxres = res
break
return True
else:
return False
def show_video_info(link):
if set_global_video_info(link):
def get_stream_info(self, res, matching_stream):
"""Helper method to get stream information based on resolution"""
stream_info = {}
if res == 'mp3':
stream_info = {
'type': "audio/mp3",
'filesize': f"{matching_stream.filesize / (1024 * 1024 * 1024):.2f} GB" if matching_stream.filesize >= 1073741824 else f"{matching_stream.filesize / (1024 * 1024):.2f} MB",
'raw_filesize': matching_stream.filesize,
'fps': None,
'raw_fps': None,
'vdo_codec': None,
'ado_codec': matching_stream.audio_codec,
'vdo_bitrate': None,
'ado_bitrate': matching_stream.abr
}
elif res == '360p':
stream_info = {
'type': matching_stream.mime_type,
'filesize': f"{matching_stream.filesize / (1024 * 1024 * 1024):.2f} GB" if matching_stream.filesize >= 1073741824 else f"{matching_stream.filesize / (1024 * 1024):.2f} MB",
'raw_filesize': matching_stream.filesize,
'fps': f"{matching_stream.fps}fps",
'raw_fps': matching_stream.fps,
'vdo_codec': matching_stream.video_codec,
'ado_codec': matching_stream.audio_codec,
'vdo_bitrate': f"{matching_stream.bitrate / 1024:.0f}kbps",
'ado_bitrate': matching_stream.abr
}
else:
_select_suitable_audio_stream = lambda stream: 139 if stream.itag in [160, 133] else (251 if stream.mime_type == 'video/webm' else 140)
# Check for HDR variants first
hdr_stream = None
if res in ['4320p', '2160p', '1440p', '1080p', '720p']:
hdr_itags = {'4320p': 702, '2160p': 701, '1440p': 700, '1080p': 699, '720p': 698}
hdr_stream = self.stream.get_by_itag(hdr_itags.get(res))
# Use HDR stream if available, otherwise use the original stream
final_stream = hdr_stream if hdr_stream else matching_stream
audio_stream = self.stream.get_by_itag(_select_suitable_audio_stream(final_stream))
total_size = final_stream.filesize + audio_stream.filesize
stream_info = {
'type': final_stream.mime_type,
'filesize': f"{total_size / (1024 * 1024 * 1024):.2f} GB" if total_size >= 1073741824 else f"{total_size / (1024 * 1024):.2f} MB",
'raw_filesize': total_size,
'fps': f"{final_stream.fps}fps",
'raw_fps': final_stream.fps,
'vdo_codec': final_stream.video_codec,
'ado_codec': audio_stream.audio_codec,
'vdo_bitrate': f"{final_stream.bitrate / 1024:.0f}kbps",
'ado_bitrate': audio_stream.abr,
'is_hdr': bool(hdr_stream), # Track if this is an HDR stream
'stream_itag': final_stream.itag # Track the actual itag being used
}
return stream_info
def show_video_info(self, link):
if self.set_video_info(link):
table = []
found = False
for res in stream_resolutions.keys():
if found or (res not in ['mp3'] and stream.filter(res=res)) or (res == 'mp3' and stream.get_by_itag(140)):
for res in self.stream_resolutions.keys():
if found or (res not in ['mp3'] and self.stream.filter(res=res)) or (res == 'mp3' and self.stream.get_by_itag(140)):
found = True
if res == 'mp3':
matching_stream = stream.get_by_itag(140)
else:
matching_stream = next((s for s in stream if s.resolution == res), None)
if matching_stream is not None:
if res == '4320p':
type = matching_stream.mime_type
filesize = f"{(matching_stream.filesize + stream.get_by_itag(140).filesize) / (1024 * 1024 * 1024):.2f} GB" if matching_stream.filesize + stream.get_by_itag(140).filesize >= 1073741824 else f"{(matching_stream.filesize + stream.get_by_itag(140).filesize) / (1024 * 1024):.2f} MB"
fps = f"{matching_stream.fps}fps"
vdo_codec = matching_stream.video_codec
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
if res == '2160p':
if stream.get_by_itag(701):
type = stream.get_by_itag(701).mime_type
filesize = f"{(stream.get_by_itag(701).filesize + stream.get_by_itag(140).filesize) / (1024 * 1024 * 1024):.2f} GB" if stream.get_by_itag(701).filesize + stream.get_by_itag(140).filesize >= 1073741824 else f"{(stream.get_by_itag(701).filesize + stream.get_by_itag(140).filesize) / (1024 * 1024):.2f} MB"
fps = f"{stream.get_by_itag(701).fps}fps"
vdo_codec = stream.get_by_itag(701).video_codec
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{stream.get_by_itag(701).bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
else:
type = matching_stream.mime_type
filesize = f"{(matching_stream.filesize + stream.get_by_itag(251).filesize) / (1024 * 1024 * 1024):.2f} GB" if matching_stream.filesize + stream.get_by_itag(251).filesize >= 1073741824 else f"{(matching_stream.filesize + stream.get_by_itag(251).filesize) / (1024 * 1024):.2f} MB"
fps = f"{matching_stream.fps}fps"
vdo_codec = matching_stream.video_codec
ado_codec = stream.get_by_itag(251).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(251).abr
elif res == '1440p':
if stream.get_by_itag(700):
type = stream.get_by_itag(700).mime_type
filesize = f"{(stream.get_by_itag(700).filesize + stream.get_by_itag(140).filesize) / (1024 * 1024 * 1024):.2f} GB" if stream.get_by_itag(700).filesize + stream.get_by_itag(140).filesize >= 1073741824 else f"{(stream.get_by_itag(700).filesize + stream.get_by_itag(140).filesize) / (1024 * 1024):.2f} MB"
fps = f"{stream.get_by_itag(700).fps}fps"
vdo_codec = stream.get_by_itag(700).video_codec
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{stream.get_by_itag(700).bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
else:
type = matching_stream.mime_type
filesize = f"{(matching_stream.filesize + stream.get_by_itag(251).filesize) / (1024 * 1024 * 1024):.2f} GB" if matching_stream.filesize + stream.get_by_itag(251).filesize >= 1073741824 else f"{(matching_stream.filesize + stream.get_by_itag(251).filesize) / (1024 * 1024):.2f} MB"
fps = f"{matching_stream.fps}fps"
vdo_codec = matching_stream.video_codec
ado_codec = stream.get_by_itag(251).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(251).abr
elif res == '1080p':
if stream.get_by_itag(699):
type = stream.get_by_itag(699).mime_type
filesize = f"{(stream.get_by_itag(699).filesize + stream.get_by_itag(140).filesize) / (1024 * 1024 * 1024):.2f} GB" if stream.get_by_itag(699).filesize + stream.get_by_itag(140).filesize >= 1073741824 else f"{(stream.get_by_itag(699).filesize + stream.get_by_itag(140).filesize) / (1024 * 1024):.2f} MB"
fps = f"{stream.get_by_itag(699).fps}fps"
vdo_codec = stream.get_by_itag(699).video_codec
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{stream.get_by_itag(699).bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
else:
type = matching_stream.mime_type
filesize = f"{(matching_stream.filesize + stream.get_by_itag(140).filesize) / (1024 * 1024 * 1024):.2f} GB" if matching_stream.filesize + stream.get_by_itag(140).filesize >= 1073741824 else f"{(matching_stream.filesize + stream.get_by_itag(140).filesize) / (1024 * 1024):.2f} MB"
fps = f"{matching_stream.fps}fps"
vdo_codec = matching_stream.video_codec
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
elif res == '720p':
if stream.get_by_itag(698):
type = stream.get_by_itag(698).mime_type
filesize = f"{(stream.get_by_itag(698).filesize + stream.get_by_itag(140).filesize) / (1024 * 1024 * 1024):.2f} GB" if stream.get_by_itag(698).filesize + stream.get_by_itag(140).filesize >= 1073741824 else f"{(stream.get_by_itag(698).filesize + stream.get_by_itag(140).filesize) / (1024 * 1024):.2f} MB"
fps = f"{stream.get_by_itag(698).fps}fps"
vdo_codec = stream.get_by_itag(698).video_codec
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{stream.get_by_itag(698).bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
else:
type = matching_stream.mime_type
filesize = f"{(matching_stream.filesize + stream.get_by_itag(140).filesize) / (1024 * 1024 * 1024):.2f} GB" if matching_stream.filesize + stream.get_by_itag(140).filesize >= 1073741824 else f"{(matching_stream.filesize + stream.get_by_itag(140).filesize) / (1024 * 1024):.2f} MB"
fps = f"{matching_stream.fps}fps"
vdo_codec = matching_stream.video_codec
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
elif res == '480p':
type = matching_stream.mime_type
filesize = f"{(matching_stream.filesize + stream.get_by_itag(140).filesize) / (1024 * 1024 * 1024):.2f} GB" if matching_stream.filesize + stream.get_by_itag(140).filesize >= 1073741824 else f"{(matching_stream.filesize + stream.get_by_itag(140).filesize) / (1024 * 1024):.2f} MB"
fps = f"{matching_stream.fps}fps"
vdo_codec = matching_stream.video_codec
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
elif res == '360p':
type = matching_stream.mime_type
filesize = f"{matching_stream.filesize / (1024 * 1024 * 1024):.2f} GB" if matching_stream.filesize >= 1073741824 else f"{matching_stream.filesize / (1024 * 1024):.2f} MB"
fps = f"{matching_stream.fps}fps"
vdo_codec = matching_stream.video_codec
ado_codec = matching_stream.audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = matching_stream.abr
elif res in ['240p', '144p']:
type = matching_stream.mime_type
filesize = f"{(matching_stream.filesize + stream.get_by_itag(139).filesize) / (1024 * 1024 * 1024):.2f} GB" if matching_stream.filesize + stream.get_by_itag(139).filesize >= 1073741824 else f"{(matching_stream.filesize + stream.get_by_itag(139).filesize) / (1024 * 1024):.2f} MB"
fps = f"{matching_stream.fps}fps"
vdo_codec = matching_stream.video_codec
ado_codec = stream.get_by_itag(139).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(139).abr
elif res == 'mp3':
type = "audio/mp3"
filesize = f"{matching_stream.filesize / (1024 * 1024 * 1024):.2f} GB" if matching_stream.filesize >= 1073741824 else f"{matching_stream.filesize / (1024 * 1024):.2f} MB"
fps = "none"
vdo_codec = "none"
ado_codec = matching_stream.audio_codec
vdo_bitrate = "none"
ado_bitrate = matching_stream.abr
matching_stream = self.stream.get_by_itag(140) if res == 'mp3' else next((s for s in self.stream if s.resolution == res), None)
else:
filesize = "N/A"
message = stream_resolutions[res]['message'] + [type] + [filesize] + [fps] + [vdo_codec] + [ado_codec] + [vdo_bitrate] + [ado_bitrate]
if matching_stream:
stream_info = self.get_stream_info(res, matching_stream)
message = self.stream_resolutions[res]['message'] + [
stream_info['type'],
stream_info['filesize'],
stream_info['fps'] if stream_info['fps'] else "none",
stream_info['vdo_codec'] if stream_info['vdo_codec'] else "none",
stream_info['ado_codec'],
stream_info['vdo_bitrate'] if stream_info['vdo_bitrate'] else "none",
stream_info['ado_bitrate']
]
table.append(message)
if not found:
print('Sorry, No video streams found....!!!')
sys.exit()
print(f'\nTitle: {video.title}\nAuthor: {author}\nPublished On: {video.publish_date.strftime("%d/%m/%Y")}\nDuration: {video.length}\nViews: {views}\n')
print(f'\nTitle: {self.video.title}\nAuthor: {self.author}\nPublished On: {self.video.publish_date.strftime("%d/%m/%Y")}\nDuration: {self.video.length}\nViews: {self.views}\n')
print(tabulate(table, headers=['Stream', 'Alias (for -s flag)', 'Format', 'Size', 'FrameRate', 'V-Codec', 'A-Codec', 'V-BitRate', 'A-BitRate']))
print('\n')
else:
print('\nInvalid video link! Please enter a valid video url...!!')
def show_raw_info(link, prettify=False):
if set_global_video_info(link):
def show_raw_info(self, link, prettify=False):
if self.set_video_info(link):
streams_list = []
found = False
for res in stream_resolutions.keys():
if found or (res not in ['mp3'] and stream.filter(res=res)) or (res == 'mp3' and stream.get_by_itag(140)):
for res in self.stream_resolutions.keys():
if found or (res not in ['mp3'] and self.stream.filter(res=res)) or (res == 'mp3' and self.stream.get_by_itag(140)):
found = True
if res == 'mp3':
matching_stream = stream.get_by_itag(140)
else:
matching_stream = next((s for s in stream if s.resolution == res), None)
if matching_stream is not None:
if res == '4320p':
itag = matching_stream.itag
resolution = '4320p'
type = matching_stream.mime_type
filesize = matching_stream.filesize + stream.get_by_itag(140).filesize
fps = matching_stream.fps
vdo_codec = matching_stream.video_codec
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
is_hdr = True if matching_stream.itag == 702 else False
if res == '2160p':
resolution = '2160p'
if stream.get_by_itag(701):
itag = 701
type = stream.get_by_itag(701).mime_type
filesize = stream.get_by_itag(701).filesize + stream.get_by_itag(140).filesize
fps = stream.get_by_itag(701).fps
vdo_codec = stream.get_by_itag(701).video_codec
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{stream.get_by_itag(701).bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
is_hdr = True
else:
itag = matching_stream.itag
type = matching_stream.mime_type
filesize = matching_stream.filesize + stream.get_by_itag(251).filesize
fps = matching_stream.fps
vdo_codec = matching_stream.video_codec
ado_codec = stream.get_by_itag(251).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(251).abr
is_hdr = False
elif res == '1440p':
resolution = '1440p'
if stream.get_by_itag(700):
itag = 700
type = stream.get_by_itag(700).mime_type
filesize = stream.get_by_itag(700).filesize + stream.get_by_itag(140).filesize
fps = stream.get_by_itag(700).fps
vdo_codec = stream.get_by_itag(700).video_codec
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{stream.get_by_itag(700).bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
is_hdr = True
else:
itag = matching_stream.itag
type = matching_stream.mime_type
filesize = matching_stream.filesize + stream.get_by_itag(251).filesize
fps = matching_stream.fps
vdo_codec = matching_stream.video_codec
ado_codec = stream.get_by_itag(251).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(251).abr
is_hdr = False
elif res == '1080p':
resolution = '1080p'
if stream.get_by_itag(699):
itag = 699
type = stream.get_by_itag(699).mime_type
filesize = stream.get_by_itag(699).filesize + stream.get_by_itag(140).filesize
fps = stream.get_by_itag(699).fps
vdo_codec = stream.get_by_itag(699).video_codec
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{stream.get_by_itag(699).bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
is_hdr = True
else:
itag = matching_stream.itag
type = matching_stream.mime_type
filesize = matching_stream.filesize + stream.get_by_itag(140).filesize
fps = matching_stream.fps
vdo_codec = matching_stream.video_codec
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
is_hdr = False
elif res == '720p':
resolution = '720p'
if stream.get_by_itag(698):
itag = 698
type = stream.get_by_itag(698).mime_type
filesize = stream.get_by_itag(698).filesize + stream.get_by_itag(140).filesize
fps = stream.get_by_itag(698).fps
vdo_codec = stream.get_by_itag(698).video_codec
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{stream.get_by_itag(698).bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
is_hdr = True
else:
itag = matching_stream.itag
type = matching_stream.mime_type
filesize = matching_stream.filesize + stream.get_by_itag(140).filesize
fps = matching_stream.fps
vdo_codec = matching_stream.video_codec
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
is_hdr = False
elif res == '480p':
itag = matching_stream.itag
resolution = '480p'
type = matching_stream.mime_type
filesize = matching_stream.filesize + stream.get_by_itag(140).filesize
fps = matching_stream.fps
vdo_codec = matching_stream.video_codec
ado_codec = stream.get_by_itag(140).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(140).abr
is_hdr = False
elif res == '360p':
itag = matching_stream.itag
resolution = '360p'
type = matching_stream.mime_type
filesize = matching_stream.filesize
fps = matching_stream.fps
vdo_codec = matching_stream.video_codec
ado_codec = matching_stream.audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = matching_stream.abr
is_hdr = False
elif res in ['240p', '144p']:
itag = matching_stream.itag
resolution = res
type = matching_stream.mime_type
filesize = matching_stream.filesize + stream.get_by_itag(139).filesize
fps = matching_stream.fps
vdo_codec = matching_stream.video_codec
ado_codec = stream.get_by_itag(139).audio_codec
vdo_bitrate = f"{matching_stream.bitrate / 1024:.0f}kbps"
ado_bitrate = stream.get_by_itag(139).abr
is_hdr = False
elif res == 'mp3':
itag = matching_stream.itag
resolution = 'mp3'
type = "audio/mp3"
filesize = matching_stream.filesize
fps = None
vdo_codec = None
ado_codec = matching_stream.audio_codec
vdo_bitrate = None
ado_bitrate = matching_stream.abr
is_hdr = False
matching_stream = self.stream.get_by_itag(140) if res == 'mp3' else next((s for s in self.stream if s.resolution == res), None)
else:
filesize = "N/A"
current_stream = {
'itag': itag,
'res': resolution,
'mime_type': type,
'file_size': filesize,
'fps': fps,
'vcodec': vdo_codec,
'acodec': ado_codec,
'vbitrate': vdo_bitrate,
'abitrate': ado_bitrate,
'is_hdr': is_hdr
}
streams_list.append(current_stream)
if matching_stream:
stream_info = self.get_stream_info(res, matching_stream)
streams_list.append({
'itag': stream_info.get('stream_itag', matching_stream.itag),
'res': res,
'mime_type': stream_info['type'],
'file_size': stream_info['raw_filesize'],
'fps': stream_info['raw_fps'],
'vcodec': stream_info['vdo_codec'],
'acodec': stream_info['ado_codec'],
'vbitrate': stream_info['vdo_bitrate'],
'abitrate': stream_info['ado_bitrate'],
'is_hdr': stream_info.get('is_hdr', False)
})
if not found:
print('Sorry, No video streams found....!!!')
sys.exit()
if prettify:
print(json.dumps({
'id': video.video_id,
'title': video.title,
'author': author,
'thumbnail_url': thumbnail,
'views': video.views,
'published_on': video.publish_date.strftime('%d/%m/%Y'),
'duration': video.length,
output = {
'id': self.video.video_id,
'title': self.video.title,
'author': self.author,
'thumbnail_url': self.thumbnail,
'views': self.video.views,
'published_on': self.video.publish_date.strftime('%d/%m/%Y'),
'duration': self.video.length,
'streams': streams_list,
}, indent=4))
else:
print(json.dumps({
'id': video.video_id,
'title': video.title,
'author': author,
'thumbnail_url': thumbnail,
'views': video.views,
'published_on': video.publish_date.strftime('%d/%m/%Y'),
'duration': video.length,
'streams': streams_list,
}))
}
print(json.dumps(output, indent=4 if prettify else None))
else:
print('\nInvalid video link! Please enter a valid video url...!!')
def get_allowed_streams(link):
if set_global_video_info(link):
def get_allowed_streams(self, link):
if self.set_video_info(link):
allowed_streams = []
found = False
for res in stream_resolutions.keys():
if found or (res not in ['mp3'] and stream.filter(res=res)) or (res == 'mp3' and stream.get_by_itag(140)):
for res in self.stream_resolutions.keys():
if found or (res not in ['mp3'] and self.stream.filter(res=res)) or (res == 'mp3' and self.stream.get_by_itag(140)):
found = True
allowed_streams.extend(stream_resolutions[res]['allowed_streams'])
allowed_streams.extend(self.stream_resolutions[res]['allowed_streams'])
return allowed_streams
else:
print('\nInvalid video link! Please enter a valid video url...!!')
return []
def print_short_info(chosen_stream):
if chosen_stream in ['720', '720p', 'hd']:
print(f'\nVideo: {title}\nSelected Stream: 720p (HD)\n')
elif chosen_stream in ['360', '360p']:
print(f'\nVideo: {title}\nSelected Stream: 360p (SD)\n')
elif chosen_stream in ['1080', '1080p', 'fhd']:
print(f'\nVideo: {title}\nSelected Stream: 1080p (FHD)\n')
elif chosen_stream in ['480', '480p']:
print(f'\nVideo: {title}\nSelected Stream: 480p (SD)\n')
elif chosen_stream in ['240', '240p']:
print(f'\nVideo: {title}\nSelected Stream: 240p (LD)\n')
elif chosen_stream in ['144', '144p']:
print(f'\nVideo: {title}\nSelected Stream: 144p (LD)\n')
elif chosen_stream in ['4320', '4320p', '8k']:
print(f'\nVideo: {title}\nSelected Stream: 4320p (8K)\n')
elif chosen_stream in ['2160', '2160p', '4k']:
print(f'\nVideo: {title}\nSelected Stream: 2160p (4K)\n')
elif chosen_stream in ['1440', '1440p', '2k']:
print(f'\nVideo: {title}\nSelected Stream: 1440p (2K)\n')
elif chosen_stream == 'mp3':
print(f'\nVideo: {title}\nSelected Stream: mp3 (Audio)\n')
def print_short_info(self, chosen_stream):
resolution_map = {
'4320': '4320p (8K)', '4320p': '4320p (8K)', '8k': '4320p (8K)',
'2160': '2160p (4K)', '2160p': '2160p (4K)', '4k': '2160p (4K)',
'1440': '1440p (2K)', '1440p': '1440p (2K)', '2k': '1440p (2K)',
'1080': '1080p (FHD)', '1080p': '1080p (FHD)', 'fhd': '1080p (FHD)',
'720': '720p (HD)', '720p': '720p (HD)', 'hd': '720p (HD)',
'480': '480p (SD)', '480p': '480p (SD)',
'360': '360p (SD)', '360p': '360p (SD)',
'240': '240p (LD)', '240p': '240p (LD)',
'144': '144p (LD)', '144p': '144p (LD)',
'mp3': 'mp3 (Audio)'
}
print(f'\nTitle: {self.title}\nSelected Stream: {resolution_map.get(chosen_stream, "Unknown")}\n')
def download_stream(self, link, chosen_stream):
if self.set_video_info(link):
self.print_short_info(chosen_stream)
allowed_streams = self.get_allowed_streams(link)
def download_stream(link, chosen_stream):
if set_global_video_info(link):
print_short_info(chosen_stream)
allowed_streams = get_allowed_streams(link)
if chosen_stream in allowed_streams:
if chosen_stream in ['360', '360p']:
download_progressive(stream, 18, title, '360p', 'mp4')
download_progressive(self.stream, 18, self.title, '360p', 'mp4')
elif chosen_stream in ['1080', '1080p', 'fhd']:
if stream.get_by_itag(699):
merge_audio_video(title, '1080p', 'mp4', download_nonprogressive(stream, 699, 140, 'mp4', tempDIR))
elif stream.get_by_itag(299):
merge_audio_video(title, '1080p', 'mp4', download_nonprogressive(stream, 299, 140, 'mp4', tempDIR))
elif stream.get_by_itag(137):
merge_audio_video(title, '1080p', 'mp4', download_nonprogressive(stream, 137, 140, 'mp4', tempDIR))
self._handle_1080p_download()
elif chosen_stream in ['720', '720p', 'hd']:
if stream.get_by_itag(698):
merge_audio_video(title, '720p', 'mp4', download_nonprogressive(stream, 698, 140, 'mp4', tempDIR))
elif stream.get_by_itag(298):
merge_audio_video(title, '720p', 'mp4', download_nonprogressive(stream, 298, 140, 'mp4', tempDIR))
elif stream.get_by_itag(136):
merge_audio_video(title, '720p', 'mp4', download_nonprogressive(stream, 136, 140, 'mp4', tempDIR))
self._handle_720p_download()
elif chosen_stream in ['480', '480p']:
merge_audio_video(title, '480p', 'mp4', download_nonprogressive(stream, 135, 140, 'mp4', tempDIR))
merge_audio_video(self.title, '480p', 'mp4', download_nonprogressive(self.stream, 135, 140, 'mp4', self.temp_dir))
elif chosen_stream in ['240', '240p']:
merge_audio_video(title, '240p', 'mp4', download_nonprogressive(stream, 133, 139, 'mp4', tempDIR))
merge_audio_video(self.title, '240p', 'mp4', download_nonprogressive(self.stream, 133, 139, 'mp4', self.temp_dir))
elif chosen_stream in ['144', '144p']:
merge_audio_video(title, '144p', 'mp4', download_nonprogressive(stream, 160, 139, 'mp4', tempDIR))
merge_audio_video(self.title, '144p', 'mp4', download_nonprogressive(self.stream, 160, 139, 'mp4', self.temp_dir))
elif chosen_stream in ['4320', '4320p', '8k']:
if stream.get_by_itag(702):
merge_audio_video(title, '8k', 'mp4', download_nonprogressive(stream, 702, 140, 'mp4', tempDIR))
elif stream.get_by_itag(571):
merge_audio_video(title, '8k', 'mp4', download_nonprogressive(stream, 571, 140, 'mp4', tempDIR))
self._handle_4320p_download()
elif chosen_stream in ['2160', '2160p', '4k']:
if stream.get_by_itag(701):
merge_audio_video(title, '4k', 'mp4', download_nonprogressive(stream, 701, 140, 'mp4', tempDIR))
elif stream.get_by_itag(315):
merge_audio_video(title, '4k', 'webm', download_nonprogressive(stream, 315, 251, 'webm', tempDIR))
elif stream.get_by_itag(313):
merge_audio_video(title, '4k', 'webm', download_nonprogressive(stream, 313, 251, 'webm', tempDIR))
self._handle_2160p_download()
elif chosen_stream in ['1440', '1440p', '2k']:
if stream.get_by_itag(700):
merge_audio_video(title, '2k', 'mp4', download_nonprogressive(stream, 700, 140, 'mp4', tempDIR))
elif stream.get_by_itag(308):
merge_audio_video(title, '2k', 'webm', download_nonprogressive(stream, 308, 251, 'webm', tempDIR))
elif stream.get_by_itag(271):
merge_audio_video(title, '2k', 'webm', download_nonprogressive(stream, 271, 251, 'webm', tempDIR))
self._handle_1440p_download()
elif chosen_stream == 'mp3':
convert_to_mp3(title, thumbnail, download_audio(stream, 140, tempDIR), author, video.title, author)
convert_to_mp3(self.title, self.thumbnail, download_audio(self.stream, 140, self.temp_dir), self.author, self.video.title, self.author)
else:
print('\nInvalid download stream or stream not available! Please choose a different stream...!! (use -i to see available streams)')
else:
print('\nInvalid video link! Please enter a valid video url...!!')
def _handle_4320p_download(self):
if self.stream.get_by_itag(702):
merge_audio_video(self.title, '8k', 'mp4', download_nonprogressive(self.stream, 702, 140, 'mp4', self.temp_dir))
elif self.stream.get_by_itag(571):
merge_audio_video(self.title, '8k', 'mp4', download_nonprogressive(self.stream, 571, 140, 'mp4', self.temp_dir))
def _handle_2160p_download(self):
if self.stream.get_by_itag(701):
merge_audio_video(self.title, '4k', 'mp4', download_nonprogressive(self.stream, 701, 140, 'mp4', self.temp_dir))
elif self.stream.get_by_itag(315):
merge_audio_video(self.title, '4k', 'webm', download_nonprogressive(self.stream, 315, 251, 'webm', self.temp_dir))
elif self.stream.get_by_itag(313):
merge_audio_video(self.title, '4k', 'webm', download_nonprogressive(self.stream, 313, 251, 'webm', self.temp_dir))
def _handle_1440p_download(self):
if self.stream.get_by_itag(700):
merge_audio_video(self.title, '2k', 'mp4', download_nonprogressive(self.stream, 700, 140, 'mp4', self.temp_dir))
elif self.stream.get_by_itag(308):
merge_audio_video(self.title, '2k', 'webm', download_nonprogressive(self.stream, 308, 251, 'webm', self.temp_dir))
elif self.stream.get_by_itag(271):
merge_audio_video(self.title, '2k', 'webm', download_nonprogressive(self.stream, 271, 251, 'webm', self.temp_dir))
def _handle_1080p_download(self):
if self.stream.get_by_itag(699):
merge_audio_video(self.title, '1080p', 'mp4', download_nonprogressive(self.stream, 699, 140, 'mp4', self.temp_dir))
else:
merge_audio_video(self.title, '1080p', 'mp4', download_nonprogressive(self.stream, 137, 140, 'mp4', self.temp_dir))
def _handle_720p_download(self):
if self.stream.get_by_itag(698):
merge_audio_video(self.title, '720p', 'mp4', download_nonprogressive(self.stream, 698, 140, 'mp4', self.temp_dir))
else:
merge_audio_video(self.title, '720p', 'mp4', download_nonprogressive(self.stream, 136, 140, 'mp4', self.temp_dir))
def main():
parser = argparse.ArgumentParser(description=f'PytubePP (Pytube Post Processor) v{version} - A Simple CLI Tool to Download Your Favorite YouTube Videos Effortlessly!')
downloader = YouTubeDownloader()
parser = argparse.ArgumentParser(description=f'PytubePP (Pytube Post Processor) v{downloader.version} - A Simple CLI Tool to Download Your Favorite YouTube Videos Effortlessly!')
parser.add_argument('url', nargs='?', default=None, help='url of the youtube video')
parser.add_argument('-df', '--download-folder', default=argparse.SUPPRESS, help='set custom download folder path (default: ~/Downloads/Pytube Downloads) [arg eg: "/path/to/folder"]')
parser.add_argument('-ds', '--default-stream', default=argparse.SUPPRESS, help='set default download stream (default: max) [available arguments: 144p, 240p, 360p, 480p, 720p, 1080p, 1440p, 2160p, 4320p, mp3, max]')
@@ -534,62 +310,55 @@ def main():
sys.exit(1)
if args.url:
if 'download_folder' in args:
if not is_valid_url(args.url):
print('\nInvalid video link! Please enter a valid video url...!!')
sys.exit()
# Handle warning messages for ignored flags
if hasattr(args, 'download_folder'):
print('\nVideo url supplied! igonering -df flag...!!')
if 'default_stream' in args:
if hasattr(args, 'default_stream'):
print('\nVideo url supplied! ignoreing -ds flag...!!')
if args.reset_default:
print('\nVideo url supplied! ignoreing -r flag...!!')
if args.clear_temp:
print('\nVideo url supplied! ignoreing -ct flag...!!')
if args.show_config:
print('\nVideo url supplied! ignoreing -sc flag...!!')
# Handle info display flags
if args.show_info:
show_video_info(args.url)
downloader.show_video_info(args.url)
if args.raw_info:
if args.json_prettify:
show_raw_info(args.url, True)
else:
show_raw_info(args.url)
downloader.show_raw_info(args.url, args.json_prettify)
if args.json_prettify and not args.raw_info:
print('\nMissing flag! -jp flag must be used with a flag which returns json data...!! (eg: -ri)')
if 'stream' in args:
download_stream(args.url, args.stream)
if 'stream' not in args and not args.show_info and not args.raw_info and not args.json_prettify:
if set_global_video_info(args.url):
if defaultStream == 'max' and maxres != None:
download_stream(args.url, maxres)
return
if (defaultStream == 'mp3' and stream.get_by_itag(140)) or (defaultStream != 'max' and stream.filter(res=defaultStream)):
download_stream(args.url, defaultStream)
# Handle download cases
if hasattr(args, 'stream'):
downloader.download_stream(args.url, args.stream)
elif not any([args.show_info, args.raw_info, args.json_prettify]): # If no info flags are set
if downloader.set_video_info(args.url):
if downloader.default_stream == 'max' and downloader.maxres:
downloader.download_stream(args.url, downloader.maxres)
elif (downloader.default_stream == 'mp3' and downloader.stream.get_by_itag(140)) or (downloader.default_stream != 'max' and downloader.stream.filter(res=downloader.default_stream)):
downloader.download_stream(args.url, downloader.default_stream)
else:
if maxres != None:
print(f'\nDefault stream not available! ( Default: {defaultStream} | Available: {maxres} )')
if downloader.maxres:
print(f'\nDefault stream not available! ( Default: {downloader.default_stream} | Available: {downloader.maxres} )')
answer = input('Do you want to download the maximum available stream ? [yes/no]\n')
while answer not in ['yes', 'y', 'no', 'n']:
print('Invalid answer! try again...!! answer with: [yes/y/no/n]')
answer = input('Do you want to download the maximum available stream ? [yes/no]\n')
if answer in ['yes', 'y']:
download_stream(args.url, maxres)
downloader.download_stream(args.url, downloader.maxres)
else:
print('Download cancelled! exiting...!!')
else:
print('Sorry, No downloadable video stream found....!!!')
else:
print('\nInvalid video link! Please enter a valid video url...!!')
else:
if 'download_folder' in args:
if args.download_folder != downloadDIR:
if hasattr(args, 'download_folder'):
if args.download_folder != downloader.download_dir:
if os.path.isdir(args.download_folder):
update_config('downloadDIR', args.download_folder)
os.makedirs(args.download_folder, exist_ok=True)
@@ -599,8 +368,8 @@ def main():
else:
print('\nDownload folder path is the same! Not updating...!!')
if 'default_stream' in args:
if args.default_stream != defaultStream:
if hasattr(args, 'default_stream'):
if args.default_stream != downloader.default_stream:
if args.default_stream in ['144p', '240p', '360p', '480p', '720p', '1080p', '1440p', '2160p', '4320p', 'mp3', 'max']:
update_config('defaultStream', args.default_stream)
print(f'\nDefault stream updated to: {args.default_stream}')
@@ -616,10 +385,10 @@ def main():
clear_temp_files()
if args.show_config:
print(f'\ndownloadDIR: {downloadDIR}\ntempDIR: {tempDIR}\nconfigDIR: {configDIR}\ndefaultStream: {defaultStream}\n')
print(f'\ndownloadDIR: {downloader.download_dir}\ntempDIR: {downloader.temp_dir}\nconfigDIR: {downloader.config_dir}\ndefaultStream: {downloader.default_stream}\n')
if args.version:
print(f'pytubepp {version}')
print(f'pytubepp {downloader.version}')
if args.show_info:
print('\nNo video url supplied! exiting...!!')
@@ -630,9 +399,8 @@ def main():
if args.json_prettify and not args.raw_info:
print('\nMissing flag! -jp flag must be used with a flag which returns json data...!! (eg: -ri)')
if 'stream' in args:
if hasattr(args, 'stream'):
print('\nNo video url supplied! exiting...!!')
if __name__ == "__main__":
main()