#!/usr/bin/python3 import sys import getopt from urllib.request import urlopen from urllib.error import HTTPError import requests import json from time import sleep from os import mkdir, path from shutil import rmtree from requests_toolbelt.multipart.encoder import MultipartEncoder import utils import yt_dlp from pathlib import Path VIDEO_LIST_FILENAME = "video_list.txt" def get_video_data(channel_id): with yt_dlp.YoutubeDL() as ydl: channel = ydl.extract_info( "https://youtube.com/channel/" + channel_id, download=False) entries = channel["entries"] queue = [] try: with open(VIDEO_LIST_FILENAME, "r") as video_list_file: video_list = video_list_file.read().split("\n") except FileNotFoundError: video_list = [] for pos, i in enumerate(reversed(entries)): published = i["upload_date"] if not i["id"] in video_list: queue.append(i) video_list.append(i["id"]) return queue def write_completion(video_id): with open(VIDEO_LIST_FILENAME, "a") as video_list_file: video_list_file.write("\n" + video_id) def download_yt_video(queue_item, dl_dir, channel_conf): url = queue_item["original_url"] dl_dir = dl_dir + channel_conf["name"] try: filepath = dl_dir + "/"+ queue_item["yt_videoid"] + "." + channel_conf["preferred_extension"] ydl_opts = { "format": "best", "output": filepath, "noplaylist": true, "merge-output-format": channel_conf["preferred_extension"], } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) except: pass # TODO: print and log exceptions def save_metadata(queue_item, dl_dir, channel_conf): dl_dir = dl_dir + channel_conf["name"] link = queue_item["original_url"] title = queue_item["title"] description = queue_item["description"] author = queue_item["uploader"] published = queue_item["upload_date"] metadata_file = dl_dir + "/" + queue_item["id"] + ".txt" metadata = open(metadata_file, "w+") # save relevant metadata as semicolon separated easy to read values to text file metadata.write('title: "' + title + '";\n\nlink: "' + link + '";\n\nauthor: "' + author + '";\n\npublished: "' + published + '";\n\ndescription: "' + description + '"\n\n;') # save raw metadata JSON string metadata.write(str(queue_item)) metadata.close() def save_thumbnail(queue_item, dl_dir, channel_conf): dl_dir = Path(dl_dir) / channel_conf["name"] url = "https://i.ytimg.com/vi_webp/%s/maxresdefault.webp" % queue_item["id"] extension = "webp" outfile = dl_dir / (queue_item["id"] + "." + extension) try: data = urlopen(url).read() except HTTPError as e: if e.code == 404: data = e.read() else: raise with open(outfile, "wb") as out: out.write(data) return extension def get_pt_auth(channel_conf): # get variables from channel_conf pt_api = channel_conf["peertube_instance"] + "/api/v1" pt_uname = channel_conf["peertube_username"] pt_passwd = channel_conf["peertube_password"] # get client ID and secret from peertube instance id_secret = json.loads(str(requests.get(pt_api + "/oauth-clients/local").content).split("'")[1]) client_id = id_secret["client_id"] client_secret = id_secret["client_secret"] # construct JSON for post request to get access token auth_json = {'client_id': client_id, 'client_secret': client_secret, 'grant_type': 'password', 'response_type': 'code', 'username': pt_uname, 'password': pt_passwd } # get access token auth_result = json.loads(str(requests.post(pt_api + "/users/token", data=auth_json).content).split("'")[1]) access_token = auth_result["access_token"] return access_token def get_pt_channel_id(channel_conf): pt_api = channel_conf["peertube_instance"] + "/api/v1" post_url = pt_api + "/video-channels/" + channel_conf["peertube_channel"] + "/" returned_json = json.loads(requests.get(post_url).content) channel_id = returned_json["id"] return channel_id def get_file(file_path): return (path.basename(file_path), open(path.abspath(file_path), 'rb'), "image/webp") def handle_peertube_result(request_result): if request_result.status_code < 300: return True else: print(request_result) return False def upload_to_pt(dl_dir, channel_conf, queue_item, access_token, thumb_extension): # Adapted from Prismedia https://git.lecygnenoir.info/LecygneNoir/prismedia pt_api = channel_conf["peertube_instance"] + "/api/v1" video_file = dl_dir + channel_conf["name"] + "/" + queue_item["id"] + "." + \ channel_conf["preferred_extension"] thumb_file = dl_dir + channel_conf["name"] + "/" + queue_item["id"] + "." + thumb_extension description = channel_conf["description_prefix"] + "\n\n" + queue_item["description"] + "\n\n" + channel_conf["description_suffix"] channel_id = str(get_pt_channel_id(channel_conf)) category = utils.set_pt_category(channel_conf["pt_channel_category"]) # We need to transform fields into tuple to deal with tags as # MultipartEncoder does not support list refer # https://github.com/requests/toolbelt/issues/190 and # https://github.com/requests/toolbelt/issues/205 try: fields = [ ("name", queue_item["title"]), ("licence", "1"), ("description", description), ("nsfw", channel_conf["nsfw"]), ("channelId", channel_id), ("originallyPublishedAt", utils.get_originally_uploaded_pt(queue_item["upload_date"])), ("category", category), ("language", channel_conf["default_lang"]), ("privacy", str(channel_conf["pt_privacy"])), ("commentsEnabled", channel_conf["comments_enabled"]), ("videofile", get_file(video_file)), ("thumbnailfile", get_file(thumb_file)), ("previewfile", get_file(thumb_file)), ("waitTranscoding", 'false') ] except: return if channel_conf["pt_tags"] != "": fields.append(("tags", "[" + channel_conf["pt_tags"] + "]")) else: print("you have no tags in your configuration file for this channel") multipart_data = MultipartEncoder(fields) headers = { 'Content-Type': multipart_data.content_type, 'Authorization': "Bearer " + access_token } return handle_peertube_result(requests.post(pt_api + "/videos/upload", data=multipart_data, headers=headers)) def pt_http_import(dl_dir, channel_conf, queue_item, access_token, thumb_extension): # Adapted from Prismedia https://git.lecygnenoir.info/LecygneNoir/prismedia pt_api = channel_conf["peertube_instance"] + "/api/v1" yt_video_url = queue_item["original_url"] thumb_file = dl_dir + channel_conf["name"] + "/" + queue_item["id"] + "." + thumb_extension description = channel_conf["description_prefix"] + "\n\n" + queue_item["description"] + "\n\n" + channel_conf["description_suffix"] channel_id = str(get_pt_channel_id(channel_conf)) language = utils.set_pt_lang(None, channel_conf["default_lang"]) category = utils.set_pt_category(channel_conf["pt_channel_category"]) # We need to transform fields into tuple to deal with tags as # MultipartEncoder does not support list refer # https://github.com/requests/toolbelt/issues/190 and # https://github.com/requests/toolbelt/issues/205 fields = [ ("name", queue_item["title"]), ("licence", "1"), ("description", description), ("nsfw", channel_conf["nsfw"]), ("channelId", channel_id), ("originallyPublishedAt", utils.get_originally_uploaded_pt(queue_item["upload_date"])), ("category", category), ("language", language), ("privacy", str(channel_conf["pt_privacy"])), ("commentsEnabled", channel_conf["comments_enabled"]), ("targetUrl", yt_video_url), ("thumbnailfile", get_file(thumb_file)), ("previewfile", get_file(thumb_file)), ("waitTranscoding", 'false') ] if channel_conf["pt_tags"] != "": fields.append(("tags[]", channel_conf["pt_tags"])) else: print("you have no tags in your configuration file for this channel") multipart_data = MultipartEncoder(fields) headers = { 'Content-Type': multipart_data.content_type, 'Authorization': "Bearer " + access_token } return handle_peertube_result(requests.post(pt_api + "/videos/imports", data=multipart_data, headers=headers)) def log_upload_error(yt_url,channel_conf): error_file = open("video_errors.csv", "a") error_file.write(channel_conf['name']+","+yt_url+"\n") error_file.close() print("error !") def run_steps(conf): # TODO: logging channel = conf["channel"] # run loop for every channel in the configuration file global_conf = conf["global"] if conf["global"]["delete_videos"] == "true": delete_videos = True else: delete_videos = False # The following enables the deletion of thumbnails, videos are not downloaded at all if conf["global"]["use_pt_http_import"] == "true": delete_videos = True use_pt_http_import = True else: use_pt_http_import = False dl_dir = global_conf["video_download_dir"] if not path.exists(dl_dir): mkdir(dl_dir) channel_counter = 0 for c in channel: print("\n") channel_id = channel[c]["channel_id"] channel_conf = channel[str(channel_counter)] queue = get_video_data(channel_id) if len(queue) > 0: if not path.exists(dl_dir + "/" + channel_conf["name"]): mkdir(dl_dir + "/" + channel_conf["name"]) # download videos, metadata and thumbnails from youtube for queue_item in queue: if not use_pt_http_import: print("downloading " + queue_item["id"] + " from YouTube...") download_yt_video(queue_item, dl_dir, channel_conf) print("done.") # TODO: download closest to config specified resolution instead of best resolution thumb_extension = save_thumbnail(queue_item, dl_dir, channel_conf) # only save metadata to text file if archiving videos if not delete_videos: print("saving video metadata...") save_metadata(queue_item, dl_dir, channel_conf) print("done.") access_token = get_pt_auth(channel_conf) # upload videos, metadata and thumbnails to peertube for queue_item in queue: if not use_pt_http_import: print("uploading " + queue_item["id"] + " to Peertube...") pt_result = upload_to_pt(dl_dir, channel_conf, queue_item, access_token, thumb_extension) else: print("mirroring " + queue_item["original_url"] + " to Peertube using HTTP import...") pt_result = pt_http_import(dl_dir, channel_conf, queue_item, access_token, thumb_extension) if pt_result: write_completion(queue_item["id"]) print("done !") else: log_upload_error(queue_item["original_url"],channel_conf) if delete_videos: print("deleting videos and/or thumbnails...") rmtree(dl_dir + "/" + channel_conf["name"], ignore_errors=True) print("done") channel_counter += 1 def run(run_once=True): #TODO: turn this into a daemon conf = utils.read_conf("config.toml") if run_once: run_steps(conf) else: while True: poll_frequency = int(conf["global"]["poll_frequency"]) * 60 run_steps(conf) sleep(poll_frequency) def main(argv): run_once=False try: opts, args = getopt.getopt(argv,"ho",["help","once"]) except: print("youtube2peertube.py [-o|--once]") sys(exit(2)) for opt, arg in opts: if opt == '-h': print("youtube2peertube.py [-o|--once]") sys.exit() elif opt in ("-o", "--once"): run_once = True run(run_once) if __name__ == "__main__": main(sys.argv[1:])