mirror of
https://github.com/KevinMidboe/bulk-downloader-for-reddit.git
synced 2025-10-29 17:40:15 +00:00
Initial commit
This commit is contained in:
0
src/__init__.py
Normal file
0
src/__init__.py
Normal file
285
src/downloader.py
Normal file
285
src/downloader.py
Normal file
@@ -0,0 +1,285 @@
|
||||
import os
|
||||
import sys
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
|
||||
from src.errors import (AlbumNotDownloadedCompletely, FileAlreadyExistsError,
|
||||
FileNameTooLong, ImgurLoginError,
|
||||
NotADownloadableLinkError)
|
||||
from src.tools import GLOBAL, nameCorrector, printToFile
|
||||
|
||||
try:
|
||||
from imgurpython import *
|
||||
except ModuleNotFoundError:
|
||||
print("\nimgurpython not found on your computer, installing...\n")
|
||||
from src.tools import install
|
||||
install("imgurpython")
|
||||
from imgurpython import *
|
||||
|
||||
|
||||
print = printToFile
|
||||
|
||||
def dlProgress(count, blockSize, totalSize):
|
||||
"""Function for writing download progress to console
|
||||
"""
|
||||
|
||||
downloadedMbs = int(count*blockSize*(10**(-6)))
|
||||
fileSize = int(totalSize*(10**(-6)))
|
||||
sys.stdout.write("\r{}Mb/{}Mb".format(downloadedMbs,fileSize))
|
||||
sys.stdout.write("\b"*len("\r{}Mb/{}Mb".format(downloadedMbs,fileSize)))
|
||||
sys.stdout.flush()
|
||||
|
||||
def getExtension(link):
|
||||
"""Extract file extension from image link.
|
||||
If didn't find any, return '.jpg'
|
||||
"""
|
||||
|
||||
imageTypes = ['jpg','png','mp4','webm','gif']
|
||||
parsed = link.split('.')
|
||||
for TYPE in imageTypes:
|
||||
if TYPE in parsed:
|
||||
return "."+parsed[-1]
|
||||
else:
|
||||
return '.jpg'
|
||||
|
||||
def getFile(fileDir,tempDir,imageURL,indent=0):
|
||||
"""Downloads given file to given directory.
|
||||
|
||||
fileDir -- Full file directory
|
||||
tempDir -- Full file directory with the extension of '.tmp'
|
||||
imageURL -- URL to the file to be downloaded
|
||||
|
||||
redditID -- Post's reddit id if renaming the file is necessary.
|
||||
As too long file names seem not working.
|
||||
"""
|
||||
|
||||
if not (os.path.isfile(fileDir)):
|
||||
for i in range(3):
|
||||
try:
|
||||
urllib.request.urlretrieve(imageURL,
|
||||
tempDir,
|
||||
reporthook=dlProgress)
|
||||
os.rename(tempDir,fileDir)
|
||||
print(" "*indent+"Downloaded"+" "*10)
|
||||
break
|
||||
except ConnectionResetError as exception:
|
||||
print(" "*indent + str(exception))
|
||||
print(" "*indent + "Trying again\n")
|
||||
except FileNotFoundError:
|
||||
raise FileNameTooLong
|
||||
else:
|
||||
raise FileAlreadyExistsError
|
||||
|
||||
class Imgur:
|
||||
def __init__(self,directory,post):
|
||||
self.imgurClient = self.initImgur()
|
||||
|
||||
imgurID = self.getId(post['postURL'])
|
||||
content = self.getLink(imgurID)
|
||||
|
||||
if not os.path.exists(directory): os.makedirs(directory)
|
||||
|
||||
if content['type'] == 'image':
|
||||
|
||||
try:
|
||||
post['mediaURL'] = content['object'].mp4
|
||||
except AttributeError:
|
||||
post['mediaURL'] = content['object'].link
|
||||
|
||||
post['postExt'] = getExtension(post['mediaURL'])
|
||||
|
||||
title = nameCorrector(post['postTitle'])
|
||||
print(title+"_" +post['postId']+post['postExt'])
|
||||
|
||||
fileDir = title + "_" + post['postId'] + post['postExt']
|
||||
fileDir = directory / fileDir
|
||||
|
||||
tempDir = title + "_" + post['postId'] + '.tmp'
|
||||
tempDir = directory / tempDir
|
||||
try:
|
||||
getFile(fileDir,tempDir,post['mediaURL'])
|
||||
except FileNameTooLong:
|
||||
fileDir = directory / post['postId'] + post['postExt']
|
||||
tempDir = directory / post['postId'] + '.tmp'
|
||||
getFile(fileDir,tempDir,post['mediaURL'])
|
||||
|
||||
elif content['type'] == 'album':
|
||||
exceptionType = ""
|
||||
images = content['object'].images
|
||||
imagesLenght = len(images)
|
||||
howManyDownloaded = imagesLenght
|
||||
duplicates = 0
|
||||
|
||||
title = nameCorrector(post['postTitle'])
|
||||
print(title+"_"+post['postId'],end="\n\n")
|
||||
|
||||
folderDir = directory / (title+"_"+post['postId'])
|
||||
|
||||
try:
|
||||
if not os.path.exists(folderDir):
|
||||
os.makedirs(folderDir)
|
||||
except FileNotFoundError:
|
||||
folderDir = directory / post['postId']
|
||||
os.makedirs(folderDir)
|
||||
|
||||
for i in range(imagesLenght):
|
||||
try:
|
||||
imageURL = images[i]['mp4']
|
||||
except KeyError:
|
||||
imageURL = images[i]['link']
|
||||
|
||||
images[i]['Ext'] = getExtension(imageURL)
|
||||
|
||||
fileName = (str(i+1)
|
||||
+ "_"
|
||||
+ nameCorrector(str(images[i]['title']))
|
||||
+ "_"
|
||||
+ images[i]['id'])
|
||||
|
||||
fileDir = folderDir / (fileName + images[i]['Ext'])
|
||||
tempDir = folderDir / (fileName + ".tmp")
|
||||
|
||||
print(" ({}/{})".format(i+1,imagesLenght))
|
||||
print(" {}".format(fileName+images[i]['Ext']))
|
||||
|
||||
try:
|
||||
getFile(fileDir,tempDir,imageURL,indent=2)
|
||||
print()
|
||||
except FileAlreadyExistsError:
|
||||
print(" The file already exists" + " "*10,end="\n\n")
|
||||
duplicates += 1
|
||||
howManyDownloaded -= 1
|
||||
|
||||
# IF FILE NAME IS TOO LONG, IT WONT REGISTER
|
||||
except FileNameTooLong:
|
||||
fileName = (str(i+1) + "_" + images[i]['id'])
|
||||
fileDir = folderDir / (fileName + images[i]['Ext'])
|
||||
tempDir = folderDir / (fileName + ".tmp")
|
||||
try:
|
||||
getFile(fileDir,tempDir,imageURL,indent=2)
|
||||
# IF STILL TOO LONG
|
||||
except FileNameTooLong:
|
||||
fileName = str(i+1)
|
||||
fileDir = folderDir / (fileName + images[i]['Ext'])
|
||||
tempDir = folderDir / (fileName + ".tmp")
|
||||
getFile(fileDir,tempDir,imageURL,indent=2)
|
||||
|
||||
except Exception as exception:
|
||||
print("\n Could not get the file")
|
||||
print(" " + str(exception) + "\n")
|
||||
exceptionType = exception
|
||||
howManyDownloaded -= 1
|
||||
|
||||
if duplicates == imagesLenght:
|
||||
raise FileAlreadyExistsError
|
||||
elif howManyDownloaded < imagesLenght:
|
||||
raise AlbumNotDownloadedCompletely
|
||||
|
||||
@staticmethod
|
||||
def initImgur():
|
||||
"""Initialize imgur api"""
|
||||
|
||||
config = GLOBAL.config
|
||||
return ImgurClient(
|
||||
config['imgur_client_id'],
|
||||
config['imgur_client_secret']
|
||||
)
|
||||
def getId(self,submissionURL):
|
||||
"""Extract imgur post id
|
||||
and determine if its a single image or album
|
||||
"""
|
||||
|
||||
domainLenght = len("imgur.com/")
|
||||
if submissionURL[-1] == "/":
|
||||
submissionURL = submissionURL[:-1]
|
||||
|
||||
if "a/" in submissionURL or "gallery/" in submissionURL:
|
||||
albumId = submissionURL.split("/")[-1]
|
||||
return {'id':albumId, 'type':'album'}
|
||||
|
||||
else:
|
||||
url = submissionURL.replace('.','/').split('/')
|
||||
imageId = url[url.index('com')+1]
|
||||
return {'id':imageId, 'type':'image'}
|
||||
|
||||
def getLink(self,identity):
|
||||
"""Request imgur object from imgur api
|
||||
"""
|
||||
|
||||
if identity['type'] == 'image':
|
||||
return {'object':self.imgurClient.get_image(identity['id']),
|
||||
'type':'image'}
|
||||
elif identity['type'] == 'album':
|
||||
return {'object':self.imgurClient.get_album(identity['id']),
|
||||
'type':'album'}
|
||||
|
||||
def get_credits():
|
||||
return Imgur.initImgur().get_credits()
|
||||
|
||||
class Gfycat:
|
||||
def __init__(self,directory,POST):
|
||||
try:
|
||||
POST['mediaURL'] = self.getLink(POST['postURL'])
|
||||
except IndexError:
|
||||
raise NotADownloadableLinkError
|
||||
except Exception as exception:
|
||||
raise NotADownloadableLinkError
|
||||
|
||||
POST['postExt'] = getExtension(POST['mediaURL'])
|
||||
|
||||
if not os.path.exists(directory): os.makedirs(directory)
|
||||
title = nameCorrector(POST['postTitle'])
|
||||
print(title+"_"+POST['postId']+POST['postExt'])
|
||||
|
||||
fileDir = directory / (title+"_"+POST['postId']+POST['postExt'])
|
||||
tempDir = directory / (title+"_"+POST['postId']+".tmp")
|
||||
|
||||
getFile(fileDir,tempDir,POST['mediaURL'])
|
||||
|
||||
def getLink(self, url, query='<source id="mp4Source" src=', lineNumber=105):
|
||||
"""Extract direct link to the video from page's source
|
||||
and return it
|
||||
"""
|
||||
|
||||
if '.webm' in url or '.mp4' in url or '.gif' in url:
|
||||
return url
|
||||
|
||||
if url[-1:] == '/':
|
||||
url = url[:-1]
|
||||
|
||||
if 'gifs' in url:
|
||||
url = "https://gfycat.com/" + url.split('/')[-1]
|
||||
|
||||
pageSource = (urllib.request.urlopen(url).read().decode().split('\n'))
|
||||
|
||||
theLine = pageSource[lineNumber]
|
||||
lenght = len(query)
|
||||
link = []
|
||||
|
||||
for i in range(len(theLine)):
|
||||
if theLine[i:i+lenght] == query:
|
||||
cursor = (i+lenght)+1
|
||||
while not theLine[cursor] == '"':
|
||||
link.append(theLine[cursor])
|
||||
cursor += 1
|
||||
break
|
||||
|
||||
if "".join(link) == "":
|
||||
raise NotADownloadableLinkError
|
||||
|
||||
return "".join(link)
|
||||
|
||||
class Direct:
|
||||
def __init__(self,directory,POST):
|
||||
POST['postExt'] = getExtension(POST['postURL'])
|
||||
if not os.path.exists(directory): os.makedirs(directory)
|
||||
title = nameCorrector(POST['postTitle'])
|
||||
print(title+"_"+POST['postId']+POST['postExt'])
|
||||
|
||||
fileDir = title+"_"+POST['postId']+POST['postExt']
|
||||
fileDir = directory / fileDir
|
||||
|
||||
tempDir = title+"_"+POST['postId']+".tmp"
|
||||
tempDir = directory / tempDir
|
||||
|
||||
getFile(fileDir,tempDir,POST['postURL'])
|
||||
41
src/errors.py
Normal file
41
src/errors.py
Normal file
@@ -0,0 +1,41 @@
|
||||
class RedditLoginFailed(Exception):
|
||||
pass
|
||||
|
||||
class ImgurLoginError(Exception):
|
||||
pass
|
||||
|
||||
class FileAlreadyExistsError(Exception):
|
||||
pass
|
||||
|
||||
class NotADownloadableLinkError(Exception):
|
||||
pass
|
||||
|
||||
class AlbumNotDownloadedCompletely(Exception):
|
||||
pass
|
||||
|
||||
class FileNameTooLong(Exception):
|
||||
pass
|
||||
|
||||
class InvalidRedditLink(Exception):
|
||||
pass
|
||||
|
||||
class NoMatchingSubmissionFound(Exception):
|
||||
pass
|
||||
|
||||
class NoPrawSupport(Exception):
|
||||
pass
|
||||
|
||||
class NoRedditSupoort(Exception):
|
||||
pass
|
||||
|
||||
class MultiredditNotFound(Exception):
|
||||
pass
|
||||
|
||||
class InsufficientPermission(Exception):
|
||||
pass
|
||||
|
||||
class InvalidSortingType(Exception):
|
||||
pass
|
||||
|
||||
class FileNotFoundError(Exception):
|
||||
pass
|
||||
239
src/parser.py
Normal file
239
src/parser.py
Normal file
@@ -0,0 +1,239 @@
|
||||
from pprint import pprint
|
||||
|
||||
try:
|
||||
from src.errors import InvalidRedditLink
|
||||
except ModuleNotFoundError:
|
||||
from errors import InvalidRedditLink
|
||||
|
||||
def QueryParser(PassedQueries,index):
|
||||
ExtractedQueries = {}
|
||||
|
||||
QuestionMarkIndex = PassedQueries.index("?")
|
||||
Header = PassedQueries[:QuestionMarkIndex]
|
||||
ExtractedQueries["HEADER"] = Header
|
||||
Queries = PassedQueries[QuestionMarkIndex+1:]
|
||||
|
||||
ParsedQueries = Queries.split("&")
|
||||
|
||||
for Query in ParsedQueries:
|
||||
Query = Query.split("=")
|
||||
ExtractedQueries[Query[0]] = Query[1]
|
||||
|
||||
if ExtractedQueries["HEADER"] == "search":
|
||||
ExtractedQueries["q"] = ExtractedQueries["q"].replace("%20"," ")
|
||||
|
||||
return ExtractedQueries
|
||||
|
||||
def LinkParser(LINK):
|
||||
RESULT = {}
|
||||
ShortLink = False
|
||||
|
||||
if not "reddit.com" in LINK:
|
||||
raise InvalidRedditLink
|
||||
|
||||
SplittedLink = LINK.split("/")
|
||||
|
||||
if SplittedLink[0] == "https:" or SplittedLink[0] == "http:":
|
||||
SplittedLink = SplittedLink[2:]
|
||||
|
||||
try:
|
||||
if (SplittedLink[-2].endswith("reddit.com") and \
|
||||
SplittedLink[-1] == "") or \
|
||||
SplittedLink[-1].endswith("reddit.com"):
|
||||
|
||||
RESULT["sort"] = "best"
|
||||
return RESULT
|
||||
except IndexError:
|
||||
if SplittedLink[0].endswith("reddit.com"):
|
||||
RESULT["sort"] = "best"
|
||||
return RESULT
|
||||
|
||||
if "redd.it" in SplittedLink:
|
||||
ShortLink = True
|
||||
|
||||
if SplittedLink[0].endswith("reddit.com"):
|
||||
SplittedLink = SplittedLink[1:]
|
||||
|
||||
if "comments" in SplittedLink:
|
||||
RESULT = {"post":LINK}
|
||||
return RESULT
|
||||
|
||||
elif "me" in SplittedLink or \
|
||||
"u" in SplittedLink or \
|
||||
"user" in SplittedLink or \
|
||||
"r" in SplittedLink or \
|
||||
"m" in SplittedLink:
|
||||
|
||||
if "r" in SplittedLink:
|
||||
RESULT["subreddit"] = SplittedLink[SplittedLink.index("r") + 1]
|
||||
|
||||
elif "m" in SplittedLink:
|
||||
RESULT["multireddit"] = SplittedLink[SplittedLink.index("m") + 1]
|
||||
RESULT["user"] = SplittedLink[SplittedLink.index("m") - 1]
|
||||
|
||||
else:
|
||||
for index in range(len(SplittedLink)):
|
||||
if SplittedLink[index] == "u" or \
|
||||
SplittedLink[index] == "user":
|
||||
|
||||
RESULT["user"] = SplittedLink[index+1]
|
||||
|
||||
elif SplittedLink[index] == "me":
|
||||
RESULT["user"] = "me"
|
||||
|
||||
|
||||
for index in range(len(SplittedLink)):
|
||||
if SplittedLink[index] in [
|
||||
"hot","top","new","controversial","rising"
|
||||
]:
|
||||
|
||||
RESULT["sort"] = SplittedLink[index]
|
||||
|
||||
if index == 0:
|
||||
RESULT["subreddit"] = "frontpage"
|
||||
|
||||
elif SplittedLink[index] in ["submitted","saved","posts","upvoted"]:
|
||||
if SplittedLink[index] == "submitted" or \
|
||||
SplittedLink[index] == "posts":
|
||||
RESULT["submitted"] = {}
|
||||
|
||||
elif SplittedLink[index] == "saved":
|
||||
RESULT["saved"] = True
|
||||
|
||||
elif SplittedLink[index] == "upvoted":
|
||||
RESULT["upvoted"] = True
|
||||
|
||||
elif "?" in SplittedLink[index]:
|
||||
ParsedQuery = QueryParser(SplittedLink[index],index)
|
||||
if ParsedQuery["HEADER"] == "search":
|
||||
del ParsedQuery["HEADER"]
|
||||
RESULT["search"] = ParsedQuery
|
||||
|
||||
elif ParsedQuery["HEADER"] == "submitted" or \
|
||||
ParsedQuery["HEADER"] == "posts":
|
||||
del ParsedQuery["HEADER"]
|
||||
RESULT["submitted"] = ParsedQuery
|
||||
|
||||
else:
|
||||
del ParsedQuery["HEADER"]
|
||||
RESULT["queries"] = ParsedQuery
|
||||
|
||||
if not ("upvoted" in RESULT or \
|
||||
"saved" in RESULT or \
|
||||
"submitted" in RESULT or \
|
||||
"multireddit" in RESULT) and \
|
||||
"user" in RESULT:
|
||||
RESULT["submitted"] = {}
|
||||
|
||||
return RESULT
|
||||
|
||||
def LinkDesigner(LINK):
|
||||
|
||||
attributes = LinkParser(LINK)
|
||||
MODE = {}
|
||||
|
||||
if "post" in attributes:
|
||||
MODE["post"] = attributes["post"]
|
||||
MODE["sort"] = ""
|
||||
MODE["time"] = ""
|
||||
return MODE
|
||||
|
||||
elif "search" in attributes:
|
||||
MODE["search"] = attributes["search"]["q"]
|
||||
|
||||
if "restrict_sr" in attributes["search"]:
|
||||
|
||||
if not (attributes["search"]["restrict_sr"] == 0 or \
|
||||
attributes["search"]["restrict_sr"] == "off"):
|
||||
|
||||
if "subreddit" in attributes:
|
||||
MODE["subreddit"] = attributes["subreddit"]
|
||||
elif "multireddit" in attributes:
|
||||
MODE["multreddit"] = attributes["multireddit"]
|
||||
MODE["user"] = attributes["user"]
|
||||
else:
|
||||
MODE["subreddit"] = "all"
|
||||
else:
|
||||
MODE["subreddit"] = "all"
|
||||
|
||||
if "t" in attributes["search"]:
|
||||
MODE["time"] = attributes["search"]["t"]
|
||||
else:
|
||||
MODE["time"] = "all"
|
||||
|
||||
if "sort" in attributes["search"]:
|
||||
MODE["sort"] = attributes["search"]["sort"]
|
||||
else:
|
||||
MODE["sort"] = "relevance"
|
||||
|
||||
if "include_over_18" in attributes["search"]:
|
||||
if attributes["search"]["include_over_18"] == 1 or \
|
||||
attributes["search"]["include_over_18"] == "on":
|
||||
MODE["nsfw"] = True
|
||||
else:
|
||||
MODE["nsfw"] = False
|
||||
|
||||
else:
|
||||
if "queries" in attributes:
|
||||
if not ("submitted" in attributes or \
|
||||
"posts" in attributes):
|
||||
|
||||
if "t" in attributes["queries"]:
|
||||
MODE["time"] = attributes["queries"]["t"]
|
||||
else:
|
||||
MODE["time"] = "day"
|
||||
else:
|
||||
if "t" in attributes["queries"]:
|
||||
MODE["time"] = attributes["queries"]["t"]
|
||||
else:
|
||||
MODE["time"] = "all"
|
||||
|
||||
if "sort" in attributes["queries"]:
|
||||
MODE["sort"] = attributes["queries"]["sort"]
|
||||
else:
|
||||
MODE["sort"] = "new"
|
||||
else:
|
||||
MODE["time"] = "day"
|
||||
|
||||
if "subreddit" in attributes and not "search" in attributes:
|
||||
MODE["subreddit"] = attributes["subreddit"]
|
||||
|
||||
elif "user" in attributes and not "search" in attributes:
|
||||
MODE["user"] = attributes["user"]
|
||||
|
||||
if "submitted" in attributes:
|
||||
MODE["submitted"] = True
|
||||
if "sort" in attributes["submitted"]:
|
||||
MODE["sort"] = attributes["submitted"]["sort"]
|
||||
elif "sort" in MODE:
|
||||
pass
|
||||
else:
|
||||
MODE["sort"] = "new"
|
||||
|
||||
if "t" in attributes["submitted"]:
|
||||
MODE["time"] = attributes["submitted"]["t"]
|
||||
else:
|
||||
MODE["time"] = "all"
|
||||
|
||||
elif "saved" in attributes:
|
||||
MODE["saved"] = True
|
||||
|
||||
elif "upvoted" in attributes:
|
||||
MODE["upvoted"] = True
|
||||
|
||||
elif "multireddit" in attributes:
|
||||
MODE["multireddit"] = attributes["multireddit"]
|
||||
|
||||
if "sort" in attributes:
|
||||
MODE["sort"] = attributes["sort"]
|
||||
elif "sort" in MODE:
|
||||
pass
|
||||
else:
|
||||
MODE["sort"] = "hot"
|
||||
|
||||
return MODE
|
||||
|
||||
if __name__ == "__main__":
|
||||
while True:
|
||||
link = input("> ")
|
||||
pprint(LinkDesigner(link))
|
||||
453
src/searcher.py
Normal file
453
src/searcher.py
Normal file
@@ -0,0 +1,453 @@
|
||||
import os
|
||||
import random
|
||||
import socket
|
||||
import webbrowser
|
||||
|
||||
try:
|
||||
import praw
|
||||
except ModuleNotFoundError:
|
||||
print("\nPRAW not found on your computer, installing...\n")
|
||||
from src.tools import install
|
||||
install("praw")
|
||||
import praw
|
||||
|
||||
from prawcore.exceptions import NotFound, ResponseException, Forbidden
|
||||
|
||||
from src.tools import GLOBAL, createLogFile, jsonFile, printToFile
|
||||
from src.errors import (NoMatchingSubmissionFound, NoPrawSupport,
|
||||
NoRedditSupoort, MultiredditNotFound,
|
||||
InvalidSortingType, RedditLoginFailed,
|
||||
InsufficientPermission)
|
||||
|
||||
print = printToFile
|
||||
|
||||
class GetAuth:
|
||||
def __init__(self,redditInstance,port):
|
||||
self.redditInstance = redditInstance
|
||||
self.PORT = int(port)
|
||||
|
||||
def recieve_connection(self):
|
||||
"""Wait for and then return a connected socket..
|
||||
Opens a TCP connection on port 8080, and waits for a single client.
|
||||
"""
|
||||
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
server.bind(('localhost', self.PORT))
|
||||
server.listen(1)
|
||||
client = server.accept()[0]
|
||||
server.close()
|
||||
return client
|
||||
|
||||
def send_message(self, message):
|
||||
"""Send message to client and close the connection."""
|
||||
self.client.send('HTTP/1.1 200 OK\r\n\r\n{}'.format(message).encode('utf-8'))
|
||||
self.client.close()
|
||||
|
||||
def getRefreshToken(self,*scopes):
|
||||
state = str(random.randint(0, 65000))
|
||||
url = self.redditInstance.auth.url(scopes, state, 'permanent')
|
||||
print("Go to this URL and login to reddit:\n\n",url)
|
||||
webbrowser.open(url,new=2)
|
||||
|
||||
self.client = self.recieve_connection()
|
||||
data = self.client.recv(1024).decode('utf-8')
|
||||
param_tokens = data.split(' ', 2)[1].split('?', 1)[1].split('&')
|
||||
params = {
|
||||
key: value for (key, value) in [token.split('=') \
|
||||
for token in param_tokens]
|
||||
}
|
||||
if state != params['state']:
|
||||
self.send_message(
|
||||
client, 'State mismatch. Expected: {} Received: {}'
|
||||
.format(state, params['state'])
|
||||
)
|
||||
raise RedditLoginFailed
|
||||
elif 'error' in params:
|
||||
self.send_message(client, params['error'])
|
||||
raise RedditLoginFailed
|
||||
|
||||
refresh_token = self.redditInstance.auth.authorize(params['code'])
|
||||
self.send_message(
|
||||
"<script>" \
|
||||
"alert(\"You can go back to terminal window now.\");" \
|
||||
"</script>"
|
||||
)
|
||||
return (self.redditInstance,refresh_token)
|
||||
|
||||
def beginPraw(config,user_agent = str(socket.gethostname())):
|
||||
"""Start reddit instance"""
|
||||
|
||||
scopes = ['identity','history','read']
|
||||
port = "1337"
|
||||
arguments = {
|
||||
"client_id":GLOBAL.reddit_client_id,
|
||||
"client_secret":GLOBAL.reddit_client_secret,
|
||||
"user_agent":user_agent
|
||||
}
|
||||
|
||||
if "reddit_refresh_token" in GLOBAL.config:
|
||||
arguments["refresh_token"] = GLOBAL.config["reddit_refresh_token"]
|
||||
reddit = praw.Reddit(**arguments)
|
||||
try:
|
||||
reddit.auth.scopes()
|
||||
except ResponseException:
|
||||
arguments["redirect_uri"] = "http://localhost:" + str(port)
|
||||
reddit = praw.Reddit(**arguments)
|
||||
authorizedInstance = GetAuth(reddit,port).getRefreshToken(*scopes)
|
||||
reddit = authorizedInstance[0]
|
||||
refresh_token = authorizedInstance[1]
|
||||
jsonFile("config.json").add({
|
||||
"reddit_refresh_token":refresh_token
|
||||
})
|
||||
else:
|
||||
arguments["redirect_uri"] = "http://localhost:" + str(port)
|
||||
reddit = praw.Reddit(**arguments)
|
||||
authorizedInstance = GetAuth(reddit,port).getRefreshToken(*scopes)
|
||||
reddit = authorizedInstance[0]
|
||||
refresh_token = authorizedInstance[1]
|
||||
jsonFile("config.json").add({
|
||||
"reddit_refresh_token":refresh_token
|
||||
})
|
||||
return reddit
|
||||
|
||||
def getPosts(args):
|
||||
"""Call PRAW regarding to arguments and pass it to redditSearcher.
|
||||
Return what redditSearcher has returned.
|
||||
"""
|
||||
|
||||
config = GLOBAL.config
|
||||
reddit = beginPraw(config)
|
||||
|
||||
if args["sort"] == "best":
|
||||
raise NoPrawSupport
|
||||
|
||||
if "subreddit" in args:
|
||||
if "search" in args:
|
||||
if args["subreddit"] == "frontpage":
|
||||
args["subreddit"] = "all"
|
||||
|
||||
if "user" in args:
|
||||
if args["user"] == "me":
|
||||
args["user"] = str(reddit.user.me())
|
||||
|
||||
print("\nGETTING POSTS\n.\n.\n.\n")
|
||||
|
||||
if not "search" in args:
|
||||
if args["sort"] == "top" or args["sort"] == "controversial":
|
||||
keyword_params = {
|
||||
"time_filter":args["time"],
|
||||
"limit":args["limit"]
|
||||
}
|
||||
# OTHER SORT TYPES DON'T TAKE TIME_FILTER
|
||||
else:
|
||||
keyword_params = {
|
||||
"limit":args["limit"]
|
||||
}
|
||||
else:
|
||||
keyword_params = {
|
||||
"time_filter":args["time"],
|
||||
"limit":args["limit"]
|
||||
}
|
||||
|
||||
if "search" in args:
|
||||
if args["sort"] in ["hot","rising","controversial"]:
|
||||
raise InvalidSortingType
|
||||
|
||||
if "subreddit" in args:
|
||||
print (
|
||||
"search for \"{search}\" in\n" \
|
||||
"subreddit: {subreddit}\nsort: {sort}\n" \
|
||||
"time: {time}\nlimit: {limit}\n".format(
|
||||
search=args["search"],
|
||||
limit=args["limit"],
|
||||
sort=args["sort"],
|
||||
subreddit=args["subreddit"],
|
||||
time=args["time"]
|
||||
).upper()
|
||||
)
|
||||
return redditSearcher(
|
||||
reddit.subreddit(args["subreddit"]).search(
|
||||
args["search"],
|
||||
limit=args["limit"],
|
||||
sort=args["sort"],
|
||||
time_filter=args["time"]
|
||||
)
|
||||
)
|
||||
|
||||
elif "multireddit" in args:
|
||||
raise NoPrawSupport
|
||||
|
||||
elif "user" in args:
|
||||
raise NoPrawSupport
|
||||
|
||||
elif "saved" in args:
|
||||
raise NoRedditSupoort
|
||||
|
||||
if args["sort"] == "relevance":
|
||||
raise InvalidSortingType
|
||||
|
||||
if "saved" in args:
|
||||
print(
|
||||
"saved posts\nuser:{username}\nlimit={limit}\n".format(
|
||||
username=reddit.user.me(),
|
||||
limit=args["limit"]
|
||||
).upper()
|
||||
)
|
||||
return redditSearcher(reddit.user.me().saved(limit=args["limit"]))
|
||||
|
||||
if "subreddit" in args:
|
||||
|
||||
if args["subreddit"] == "frontpage":
|
||||
|
||||
print (
|
||||
"subreddit: {subreddit}\nsort: {sort}\n" \
|
||||
"time: {time}\nlimit: {limit}\n".format(
|
||||
limit=args["limit"],
|
||||
sort=args["sort"],
|
||||
subreddit=args["subreddit"],
|
||||
time=args["time"]
|
||||
).upper()
|
||||
)
|
||||
return redditSearcher(
|
||||
getattr(reddit.front,args["sort"]) (**keyword_params)
|
||||
)
|
||||
|
||||
else:
|
||||
print (
|
||||
"subreddit: {subreddit}\nsort: {sort}\n" \
|
||||
"time: {time}\nlimit: {limit}\n".format(
|
||||
limit=args["limit"],
|
||||
sort=args["sort"],
|
||||
subreddit=args["subreddit"],
|
||||
time=args["time"]
|
||||
).upper()
|
||||
)
|
||||
return redditSearcher(
|
||||
getattr(
|
||||
reddit.subreddit(args["subreddit"]),args["sort"]
|
||||
) (**keyword_params)
|
||||
)
|
||||
|
||||
elif "multireddit" in args:
|
||||
print (
|
||||
"user: {user}\n" \
|
||||
"multireddit: {multireddit}\nsort: {sort}\n" \
|
||||
"time: {time}\nlimit: {limit}\n".format(
|
||||
user=args["user"],
|
||||
limit=args["limit"],
|
||||
sort=args["sort"],
|
||||
multireddit=args["multireddit"],
|
||||
time=args["time"]
|
||||
).upper()
|
||||
)
|
||||
try:
|
||||
return redditSearcher(
|
||||
getattr(
|
||||
reddit.multireddit(
|
||||
args["user"], args["multireddit"]
|
||||
),args["sort"]
|
||||
) (**keyword_params)
|
||||
)
|
||||
except NotFound:
|
||||
raise MultiredditNotFound
|
||||
|
||||
elif "submitted" in args:
|
||||
# TODO
|
||||
# USE REDDIT.USER.ME() INSTEAD WHEN "ME" PASSED AS A --USER
|
||||
print (
|
||||
"submitted posts of {user}\nsort: {sort}\n" \
|
||||
"time: {time}\nlimit: {limit}\n".format(
|
||||
limit=args["limit"],
|
||||
sort=args["sort"],
|
||||
user=args["user"],
|
||||
time=args["time"]
|
||||
).upper()
|
||||
)
|
||||
return redditSearcher(
|
||||
getattr(
|
||||
reddit.redditor(args["user"]).submissions,args["sort"]
|
||||
) (**keyword_params)
|
||||
)
|
||||
|
||||
elif "upvoted" in args:
|
||||
# TODO
|
||||
# USE REDDIT.USER.ME() INSTEAD WHEN "ME" PASSED AS A --USER
|
||||
print (
|
||||
"upvoted posts of {user}\nlimit: {limit}\n".format(
|
||||
user=args["user"],
|
||||
limit=args["limit"]
|
||||
).upper()
|
||||
)
|
||||
try:
|
||||
return redditSearcher(
|
||||
reddit.redditor(args["user"]).upvoted(limit=args["limit"])
|
||||
)
|
||||
except Forbidden:
|
||||
raise InsufficientPermission
|
||||
|
||||
elif "post" in args:
|
||||
print("post: {post}\n".format(post=args["post"]).upper())
|
||||
return redditSearcher(
|
||||
reddit.submission(url=args["post"]),SINGLE_POST=True
|
||||
)
|
||||
|
||||
def redditSearcher(posts,SINGLE_POST=False):
|
||||
"""Check posts and decide if it can be downloaded.
|
||||
If so, create a dictionary with post details and append them to a list.
|
||||
Write all of posts to file. Return the list
|
||||
"""
|
||||
|
||||
subList = []
|
||||
global subCount
|
||||
subCount = 0
|
||||
global orderCount
|
||||
orderCount = 0
|
||||
global gfycatCount
|
||||
gfycatCount = 0
|
||||
global imgurCount
|
||||
imgurCount = 0
|
||||
global directCount
|
||||
directCount = 0
|
||||
|
||||
postsFile = createLogFile("POSTS")
|
||||
|
||||
if SINGLE_POST:
|
||||
submission = posts
|
||||
subCount += 1
|
||||
try:
|
||||
details = {'postId':submission.id,
|
||||
'postTitle':submission.title,
|
||||
'postSubmitter':str(submission.author),
|
||||
'postType':None,
|
||||
'postURL':submission.url,
|
||||
'postSubreddit':submission.subreddit.display_name}
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
postsFile.add({subCount:[details]})
|
||||
details = checkIfMatching(submission)
|
||||
|
||||
if details is not None:
|
||||
if not details["postType"] == "self":
|
||||
orderCount += 1
|
||||
printSubmission(submission,subCount,orderCount)
|
||||
subList.append(details)
|
||||
else:
|
||||
postsFile.add({subCount:[details]})
|
||||
|
||||
else:
|
||||
for submission in posts:
|
||||
subCount += 1
|
||||
|
||||
try:
|
||||
details = {'postId':submission.id,
|
||||
'postTitle':submission.title,
|
||||
'postSubmitter':str(submission.author),
|
||||
'postType':None,
|
||||
'postURL':submission.url,
|
||||
'postSubreddit':submission.subreddit.display_name}
|
||||
except AttributeError:
|
||||
continue
|
||||
|
||||
postsFile.add({subCount:[details]})
|
||||
details = checkIfMatching(submission)
|
||||
|
||||
if details is not None:
|
||||
if not details["postType"] == "self":
|
||||
orderCount += 1
|
||||
printSubmission(submission,subCount,orderCount)
|
||||
subList.append(details)
|
||||
else:
|
||||
postsFile.add({subCount:[details]})
|
||||
|
||||
if not len(subList) == 0:
|
||||
print(
|
||||
"\nTotal of {} submissions found!\n"\
|
||||
"{} GFYCATs, {} IMGURs and {} DIRECTs\n"
|
||||
.format(len(subList),gfycatCount,imgurCount,directCount)
|
||||
)
|
||||
return subList
|
||||
else:
|
||||
raise NoMatchingSubmissionFound
|
||||
|
||||
def checkIfMatching(submission):
|
||||
global gfycatCount
|
||||
global imgurCount
|
||||
global directCount
|
||||
|
||||
try:
|
||||
details = {'postId':submission.id,
|
||||
'postTitle':submission.title,
|
||||
'postSubmitter':str(submission.author),
|
||||
'postType':None,
|
||||
'postURL':submission.url,
|
||||
'postSubreddit':submission.subreddit.display_name}
|
||||
except AttributeError:
|
||||
return None
|
||||
|
||||
if ('gfycat' in submission.domain) or \
|
||||
('imgur' in submission.domain):
|
||||
|
||||
if 'gfycat' in submission.domain:
|
||||
details['postType'] = 'gfycat'
|
||||
gfycatCount += 1
|
||||
return details
|
||||
|
||||
elif 'imgur' in submission.domain:
|
||||
details['postType'] = 'imgur'
|
||||
|
||||
imgurCount += 1
|
||||
return details
|
||||
|
||||
elif isDirectLink(submission.url) is True:
|
||||
details['postType'] = 'direct'
|
||||
directCount += 1
|
||||
return details
|
||||
|
||||
elif submission.is_self:
|
||||
details['postType'] = 'self'
|
||||
return details
|
||||
|
||||
def printSubmission(SUB,validNumber,totalNumber):
|
||||
"""Print post's link, title and media link to screen"""
|
||||
|
||||
print(validNumber,end=") ")
|
||||
print(totalNumber,end=" ")
|
||||
print(
|
||||
"https://www.reddit.com/"
|
||||
+"r/"
|
||||
+SUB.subreddit.display_name
|
||||
+"/comments/"
|
||||
+SUB.id
|
||||
)
|
||||
print(" "*(len(str(validNumber))
|
||||
+(len(str(totalNumber)))+3),end="")
|
||||
|
||||
try:
|
||||
print(SUB.title)
|
||||
except:
|
||||
SUB.title = "unnamed"
|
||||
print("SUBMISSION NAME COULD NOT BE READ")
|
||||
pass
|
||||
|
||||
print(" "*(len(str(validNumber))+(len(str(totalNumber)))+3),end="")
|
||||
print(SUB.url,end="\n\n")
|
||||
|
||||
def isDirectLink(URL):
|
||||
"""Check if link is a direct image link.
|
||||
If so, return True,
|
||||
if not, return False
|
||||
"""
|
||||
|
||||
imageTypes = ['.jpg','.png','.mp4','.webm','.gif']
|
||||
if URL[-1] == "/":
|
||||
URL = URL[:-1]
|
||||
|
||||
if "i.reddituploads.com" in URL:
|
||||
return True
|
||||
|
||||
for extension in imageTypes:
|
||||
if extension in URL:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
148
src/tools.py
Normal file
148
src/tools.py
Normal file
@@ -0,0 +1,148 @@
|
||||
import io
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
|
||||
try:
|
||||
from pip import main as pipmain
|
||||
except:
|
||||
from pip._internal import main as pipmain
|
||||
|
||||
from os import makedirs, path, remove
|
||||
from pathlib import Path
|
||||
|
||||
from src.errors import FileNotFoundError
|
||||
|
||||
def install(package):
|
||||
pipmain(['install', package])
|
||||
|
||||
class GLOBAL:
|
||||
"""Declare global variables"""
|
||||
|
||||
RUN_TIME = 0
|
||||
config = None
|
||||
arguments = None
|
||||
directory = None
|
||||
reddit_client_id = "Jx3iqqGkSmE5sg"
|
||||
reddit_client_secret = "PFzVAVRLN78JI48e3bQ5KsgLZp4"
|
||||
printVanilla = print
|
||||
|
||||
class jsonFile:
|
||||
""" Write and read JSON files
|
||||
|
||||
Use add(self,toBeAdded) to add to files
|
||||
|
||||
Use delete(self,*deletedKeys) to delete keys
|
||||
"""
|
||||
|
||||
FILEDIR = ""
|
||||
|
||||
def __init__(self,FILEDIR):
|
||||
self.FILEDIR = FILEDIR
|
||||
if not path.exists(self.FILEDIR):
|
||||
self.__writeToFile({},create=True)
|
||||
|
||||
def read(self):
|
||||
with open(self.FILEDIR, 'r') as f:
|
||||
return json.load(f)
|
||||
|
||||
def add(self,toBeAdded):
|
||||
"""Takes a dictionary and merges it with json file.
|
||||
It uses new key's value if a key already exists.
|
||||
Returns the new content as a dictionary.
|
||||
"""
|
||||
|
||||
data = self.read()
|
||||
data = {**data, **toBeAdded}
|
||||
self.__writeToFile(data)
|
||||
return self.read()
|
||||
|
||||
def delete(self,*deleteKeys):
|
||||
"""Delete given keys from JSON file.
|
||||
Returns the new content as a dictionary.
|
||||
"""
|
||||
|
||||
data = self.read()
|
||||
for deleteKey in deleteKeys:
|
||||
if deleteKey in data:
|
||||
del data[deleteKey]
|
||||
found = True
|
||||
if not found:
|
||||
return False
|
||||
self.__writeToFile(data)
|
||||
|
||||
def __writeToFile(self,content,create=False):
|
||||
if not create:
|
||||
remove(self.FILEDIR)
|
||||
with open(self.FILEDIR, 'w') as f:
|
||||
json.dump(content, f, indent=4)
|
||||
|
||||
def createLogFile(TITLE):
|
||||
"""Create a log file with given name
|
||||
inside a folder time stampt in its name and
|
||||
put given arguments inside \"HEADER\" key
|
||||
"""
|
||||
|
||||
folderDirectory = GLOBAL.directory / str(time.strftime("%d-%m-%Y_%H-%M-%S",
|
||||
time.localtime(GLOBAL.RUN_TIME)))
|
||||
logFilename = TITLE.upper()+'.json'
|
||||
|
||||
if not path.exists(folderDirectory):
|
||||
makedirs(folderDirectory)
|
||||
|
||||
FILE = jsonFile(folderDirectory / Path(logFilename))
|
||||
HEADER = " ".join(sys.argv)
|
||||
FILE.add({"HEADER":HEADER})
|
||||
|
||||
return FILE
|
||||
|
||||
def printToFile(*args, **kwargs):
|
||||
"""Print to both CONSOLE and
|
||||
CONSOLE LOG file in a folder time stampt in the name
|
||||
"""
|
||||
|
||||
TIME = str(time.strftime("%d-%m-%Y_%H-%M-%S",
|
||||
time.localtime(GLOBAL.RUN_TIME)))
|
||||
folderDirectory = GLOBAL.directory / TIME
|
||||
print(*args,**kwargs)
|
||||
|
||||
if not path.exists(folderDirectory):
|
||||
makedirs(folderDirectory)
|
||||
|
||||
with io.open(
|
||||
folderDirectory / "CONSOLE_LOG.txt","a",encoding="utf-8"
|
||||
) as FILE:
|
||||
print(*args, file=FILE, **kwargs)
|
||||
|
||||
def nameCorrector(string):
|
||||
"""Swap strange characters from given string
|
||||
with underscore (_) and shorten it.
|
||||
Return the string
|
||||
"""
|
||||
|
||||
stringLenght = len(string)
|
||||
if stringLenght > 200:
|
||||
string = string[:200]
|
||||
stringLenght = len(string)
|
||||
spacesRemoved = []
|
||||
|
||||
for b in range(stringLenght):
|
||||
if string[b] == " ":
|
||||
spacesRemoved.append("_")
|
||||
else:
|
||||
spacesRemoved.append(string[b])
|
||||
|
||||
string = ''.join(spacesRemoved)
|
||||
correctedString = []
|
||||
|
||||
if len(string.split('\n')) > 1:
|
||||
string = "".join(string.split('\n'))
|
||||
|
||||
BAD_CHARS = ['\\','/',':','*','?','"','<','>','|','.',]
|
||||
|
||||
if any(x in string for x in BAD_CHARS):
|
||||
for char in string:
|
||||
if char in BAD_CHARS:
|
||||
string = string.replace(char,"_")
|
||||
|
||||
return string
|
||||
Reference in New Issue
Block a user