import requests import json import time import random import sys from steamrelationships.constants import _Const CONST = _Const() class SteamRelationships: """A class that handles the querring of Steam's web api to request a user's friends list""" def __init__(self, webapikey: str, timeout: int = 30, timeout_retries: int = 5, reqdelay: float = 0.5, delayrand: int = 25, reqsafetybuffer: float = 0.9, reqjson: str = f"{sys.path[0]}/requests.json") -> None: """ (str) webapikey - A Steam "web api key" for grabbing friends lists (get one at https://steamcommunity.com/dev/apikey) (int) timeout (Default: 30) - The time in seconds to wait for a response from Steam before timing out (int) timeout_retries (Default: 5) - The number of times to retry a scan after a timeout (float) reqdelay (Default: 0.5) - The base amount of seconds to wait after each scan (to lessen the load on Steam's servers) (int) delayrand (Default: 25) - The maximum percent up/down to add/subtract to the request delay (float) reqsafetybuffer (Default: 0.9) - A multiplier applied to Steam's max API request limit of 100,000. At 0.9, the new limit is 90,000 (str) reqjson (Default: "requests.json") - The path to a .json file that stores request numbers If a value is entered and is of the incorrect type / can't be automatically converted, SteamRelationships will raise a type error All numerical values should be positive, and will be made positive automatically if otherwise delayrand will be truncated to between [0, 99] (inclusive) if over 100 """ # This used to be in a try-except block, but I decided that it should error out on a proper error self.webapikey: str = str(webapikey) self.timeout: int = int(abs(timeout)) self.timeout_retries: int = int(abs(timeout_retries)) self.reqdelay: float = float(abs(reqdelay)) self.delayrand: int = int(abs(delayrand) % 100) # clamped to 0-99 self.reqsafetybuffer: float = float(abs(reqsafetybuffer)) self.reqjson: str = str(reqjson) # Not specified by user, but necessary regardless self.session: requests.Session = requests.Session() # Session object so repeated querries to steam's api use the same TCP connection self.scanlist: dict = {} # To be populated by scan functions def __str__(self) -> str: return f"Vals:\n\twebapikey: {len(self.webapikey) > 0} (Actual value ommitted for security)\n\n\tTimeout: {self.timeout}\n\tTimeout Retries: {self.timeout_retries}\n\tRequest Delay: {self.reqdelay}\n\tRequest Delay Randomness Factor: +/- {self.delayrand}%\n\tRequest Safety Buffer: {self.reqsafetybuffer} ({self.reqsafetybuffer * CONST.STEAMAPI_MAXREQ} out of {CONST.STEAMAPI_MAXREQ} max requests)\n\tRequests Log Filepath: \"{self.reqjson}\"\n\n\tMost Recent Scan: {self.scanlist}" def __readjsonfile(self, filepath: str = "") -> dict: """ __readjsonfile(self, filepath: str = "") -> dict: Read the specified json file for previous requests filepath: Path to json file dict (return): The contents of the json file. Empty on error __readjsonfile will create an "empty" json file if the specified file at the filepath doesn't exist """ if not filepath: filepath = self.reqjson final: dict = {} # Try to read the contents of the given file try: with open(filepath, "r+") as jsonfile: final = json.load(jsonfile) jsonfile.close() # If the file does not exist, create one and slap an empty list in it except FileNotFoundError: print(f"[__readjsonfile] File {filepath} does not exist. Generating empty json file...") try: with open(filepath, "w+") as newfile: json.dump({time.time_ns(): [0, []]}, newfile, indent=CONST.JSON_INDENT) newfile.close() return self.__readjsonfile(filepath) except Exception as e: print(f"[__readjsonfile] Couldn't create new file ({e})") return {} except Exception as e: print(f"[__readjsonfile] Other unknown error occured ({e})") return {} return final def __checkrequests(self, filename: str = "", increment: bool = True) -> int: """ __checkrequests(self, filename: str = "", increment: bool = True) -> int: Check the requests log to make sure Steam's request limit hasn't been passed filename: filepath to requests log increment: Whether to increment the number of requests or not int (return): The number of requests in the last 24 hours. -1 on error, 0 on too many requests, and >1 if a valid number of requests __checkrequests will create a file at filepath if it doesn't exist. It will also never go over 100,000 requests, regardless of what reqsafetybuffer is """ if not filename: filename = self.reqjson # Get the contents of the specified file jsoncontents: dict = {} try: jsoncontents = self.__readjsonfile(filename) except Exception as e: print(f"[__checkrequests] Could not get the contents of file \"{filename}\" ({e})") return -1 # Check the current date. If over 1 day since last entry, add a new entry. Otherwise, edit the current day's entry [note, 1 day in nanoseconds = (8.64 * (10 ** 13)) ] checktime = time.time_ns() if (checktime - int(list(jsoncontents.keys())[-1])) > CONST.DAY_IN_NANO: jsoncontents[checktime] = [0, []] else: # This bullshit brought to you by: ordered dictionaries currentreqs = jsoncontents[list(jsoncontents.keys())[-1]][0] if currentreqs < (CONST.STEAMAPI_MAXREQ * self.reqsafetybuffer) and currentreqs < CONST.STEAMAPI_MAXREQ: if increment == True: jsoncontents[list(jsoncontents.keys())[-1]][0] += 1 jsoncontents[list(jsoncontents.keys())[-1]][1].append(checktime) else: print(f"[__checkrequests] Daily request limit reached ({currentreqs}/{CONST.STEAMAPI_MAXREQ * self.reqsafetybuffer}). Please try again tomorrow, or increase \"reqsafetybuffer\" (currently: {self.reqsafetybuffer})") return 0 # Update the json file if increment == True: try: with open(filename, "w+t") as jsonfile: json.dump(jsoncontents, jsonfile, indent=CONST.JSON_INDENT) jsonfile.close() except Exception as e: print(f"[__checkrequests] Could not update json file ({e})") return -1 return jsoncontents[list(jsoncontents.keys())[-1]][0] def __getFriendsList(self, steamid64: str, _retries: int = 0) -> dict: """ __getFriendsList(self, steamid64: str, _retries: int = 0) -> dict: Send a request to the Steam Web API to get a user's friends list steamid64: A Steam User's id, in the steamid64 format _retries: An internal value used to limit the number of retries after a timeout. Increments by one automatically on every timeout dict (return): The json representation of Steam's response. Empty on error """ if not steamid64: print("[__getFriendsList] No steamid64 given") return {} # Make sure we haven't gone over steam's daily max if self.__checkrequests() == 0: print("[__getFriendsList] Max requests reached, refusing to contact steam") return {} url: str = "https://api.steampowered.com/ISteamUser/GetFriendList/v0001/" options: dict = {"key": self.webapikey, "steamid": steamid64, "format": "json"} result: object = None # Contact steam try: result = self.session.get(url, params=options, timeout=self.timeout) except requests.exceptions.Timeout: print(f"[__getFriendsList] Request timed out (No response for {self.timeout} seconds)") if _retries <= self.timeout_retries: print(f"[__getFriendsList] Retrying request... (Attempt {_retries}/{self.timeout_retries})") return self.__getFriendsList(steamid64, _retries + 1) print("[__getFriendsList] Retry limit reached") return {} except Exception as e: print(f"[__getFriendsList] Other error in contacting steam ({e})") return {} # Error out on request error if result.status_code != requests.codes.ok: print(f"[__getFriendsList] Got bad status code (Requested id: {steamid64}, status: {result.status_code})", end="") if result.status_code == 401: # Steam returns a 401 on profiles with private friends lists print(f". It is likely that the requested id has a private profile / private friends list", end="") print("\n", end="") return {} # Get the json contents from the response resultjson: dict = {} try: resultjson = result.json() except requests.exceptions.JSONDecodeError: print("[__getFriendsList] Could not decode json response for some reason") return {} except Exception as e: print(f"[__getFriendsList] Unknown error in getting json response ({e})") return {} return resultjson def __parseFriendsList(self, friendsdict: dict) -> list: """ __parseFriendsList(self, friendsdict: dict) -> list: Parse a response from Steam and extract a user's friend's Steam IDs friendsdict: The return value of __getFriendsList list (return): The steamid64's of a user's friends. Empty on error """ if not friendsdict: print("[__parseFriendsList] Empty friends dict given") return [] people: list = [] try: for friend in friendsdict["friendslist"]["friends"]: people.append(f'{friend["steamid"]}') except Exception as e: print("[__parseFriendsList] Error parsing friendsdict ({e})") people.clear() return people def basicscan(self, steamid64: str) -> dict: ''' basicscan - do a basic scan of someone's steam friends PARAMS: (str) steamid64 - The 64 bit steam id of the user you want to scan RETURN VALUES: (dict) EMPTY - There was an error scanning the user's friends list (dict) {steamid64: [friendID1, friendID2, ...]} - A dict with a single key, that of the scanned user, which maps to a list of the users's friends ''' self.scanlist = {steamid64: self.__parseFriendsList(self.__getFriendsList(steamid64))} return self.scanlist def recursivescan(self, steamid64: str, recurselevel: int = 2) -> dict: ''' recursivescan - Scan a user's friends list, then scan their friends as well PARAMS: (str) steamid64 - The starting user to scan (int) recurselevel - The number of recursive scans to complete > Note: 0 is equivalent to a basic scan; 1 scans the specified user, then the user's friends; etc. RETURN VALUES: (dict) EMPTY - Some catastrophic error has occured and no scan could be started (dict) { steamid64: [friend1id, friend2id, friend3id, ...], friend1id: [other_friend, other_friend, ...], friend2id: [other_friend, other_friend, ...], friend3id: [other_friend, other_friend, ...], ... } - A dict containing the starting steamid, then the friends contained in the original scan with the results of their scan NOTE: Please do not use a value greater than 3. While theoretically any value works, due to the exponential nature of friendship relations, you will very quickly spam Steam with tens of thousands of requests. If this concept is unfamiliar to you, please take a quick glance at the Wikipedia page for "six degress of separation": https://en.wikipedia.org/wiki/Six_degrees_of_separation TLDR: recursivescan is exponential and you will reach 100,000 requests very quickly if you recurse greater than 3 ''' testlist: dict = self.basicscan(steamid64) alreadyscanned: list = [steamid64] for i in range(recurselevel): tempdict: dict = {} for person in testlist: for friend in testlist[person]: if friend not in alreadyscanned: tempdict.update(self.basicscan(friend)) alreadyscanned.append(friend) sleepytime: float = abs(random.randrange(100 - self.delayrand, 100 + self.delayrand) / 100 * self.reqdelay) time.sleep(sleepytime) testlist.update(tempdict) tempdict.clear() self.scanlist = testlist return self.scanlist