import requests import json import time from steamrelationships.constants import _Const CONST = _Const() class SteamRelationships: session: object = requests.Session() # Session object so repeated querries to steam's api use the same TCP connection scanlist: list = [] # To be populated by recurse() reqjson: str = "requests.json" # Path to the JSON file that handles requests def __init__(self, webapikey: str, timeout: int = 30, timeout_retries: int = 5, reqdelay: float = 0.5, delayrand: float = 1.25, reqsafetybuffer: float = 0.9, reqjson: str = "requests.json") -> None: """ (str/int) webapikey - Steam dev api key required to use Steam's ISteamUser/GetFriendList interface Request Options: (int) timeout (Default: 30) - Seconds to wait before timing out a request. (int) timeout_retries (Default: 5) Number of times to retry a request after timing out (float) reqdelay (Default: 0.5) - Default amount of seconds to wait between sending each request to Steam (float) delayrand (Default: 1.25) - The max percentage of extra delay to randomly add to each request (1 = no randomness, <1 = randomly shorter, >1 = randomly longer) (float) reqsafetybuffer (Default: 0.9) - Highest percent of Steam's API request limit you are willing to run > Steam has an API request limit of 100,000 requests per day. reqsafetybuffer = 0.9 means SteamRequests will send a max of 90,000 requests per day. Entering a number higher than 1 will not result in sending more than 100,000 requests per day (str) reqjson (Default: "requests.json") - The name/filepath of the file to store request times in """ # I can't make this a formatted string :sob: self.webapikey = webapikey self.timeout = timeout self.timeout_retries = timeout_retries self.reqdelay = reqdelay self.delayrand = delayrand self.reqsafetybuffer = reqsafetybuffer self.reqjson = reqjson def _readjsonfile(self, filepath: str = "") -> dict: if not filepath: filepath = self.reqjson final: dict = {} # Try to read the contents of the given file try: with open(filepath, "r+") as jsonfile: final = json.load(jsonfile) jsonfile.close() # If the file does not exist, create one and slap an empty list in it except FileNotFoundError: print(f"[_readjsonfile] File {filepath} does not exist. Generating empty json file...") try: with open(filepath, "w+") as newfile: json.dump({time.time_ns(): [0, []]}, newfile, indent=CONST.JSON_INDENT) newfile.close() return self._readjsonfile(filepath) except Exception as e: print(f"[_readjsonfile] Couldn't create new file ({e})") return {} except Exception as e: print(f"[_readjsonfile] Other unknown error occured ({e})") return {} return final def _checkrequests(self, filename: str = "") -> int: if not filename: filename = self.reqjson # Get the contents of the specified file jsoncontents: dict = {} try: jsoncontents = self._readjsonfile(filename) except Exception as e: print(f"[_checkrequests] Could not get the contents of file {filename} ({e})") return -1 # Check the current date. If over 1 day since last entry, add a new entry. Otherwise, edit the current day's entry [note, 1 day in nanoseconds = (8.64 * (10 ** 13)) ] checktime = time.time_ns() if (checktime - int(list(jsoncontents.keys())[-1])) > CONST.DAY_IN_NANO: jsoncontents[checktime] = [0, []] else: # This bullshit brought to you by: ordered dictionaries currentreqs = jsoncontents[list(jsoncontents.keys())[-1]][0] if currentreqs < (CONST.STEAMAPI_MAXREQ * self.reqsafetybuffer) and currentreqs < CONST.STEAMAPI_MAXREQ: jsoncontents[list(jsoncontents.keys())[-1]][0] += 1 jsoncontents[list(jsoncontents.keys())[-1]][1].append(checktime) else: print(f"[_checkrequests] Daily request limit reached ({jsoncontents[list(jsoncontents.keys())[-1]][0]}/{CONST.STEAMAPI_MAXREQ * self.reqsafetybuffer}). Please try again tomorrow, or increase \"reqsafetybuffer\" (currently: {self.reqsafetybuffer})") return 0 # Update the json file try: with open(filename, "w+t") as jsonfile: json.dump(jsoncontents, jsonfile, indent=CONST.JSON_INDENT) jsonfile.close() except Exception as e: print("[_checkrequests] Could not update json file ({e})") return -1 return jsoncontents[list(jsoncontents.keys())[-1]][0] def _getFriendsList(self, steamid64: str, retrys: int = 0) -> dict: if not steamid64: print("[_getFriendsList] No steamid64 given") return {} # Make sure we haven't gone over steam's daily max if self._checkrequests() == 0: print("[_getFriendsList] Max requests reached, refusing to contact steam") return {} url: str = "https://api.steampowered.com/ISteamUser/GetFriendList/v0001/" options: dict = {"key": self.webapikey, "steamid": steamid64, "format": "json"} result: object = None # Contact steam try: result = self.session.get(url, params=options, timeout=self.timeout) except requests.exceptions.Timeout: print(f"[_getFriendsList] Request timed out (No response for {self.timeout} seconds)") if retrys <= self.timeout_retries: print(f"[_getFriendsList] Retrying request... (Attempt {retrys}/{self.timeout_retries})") return self._getFriendsList(steamid64, retrys + 1) print("[_getFriendsList] Retry limit reached") return {} except Exception as e: print(f"[_getFriendsList] Other error in contacting steam ({e})") return {} # Error out on request error if result.status_code != requests.codes.ok: print("[_getFriendsList] Got bad status code") return {} # Get the json contents from the response resultjson: dict = {} try: resultjson = result.json() except requests.exceptions.JSONDecodeError: print("[_getFriendsList] Could not decode json response for some reason") return {} except Exception as e: print(f"[_getFriendsList] Unknown error in getting json response ({e})") return {} return resultjson def _parseFriendsList(self, friendsdict: dict) -> list: if not friendsdict: print("[_parseFriendsList] Empty friends dict given") return [] people: list = [] try: for friend in friendsdict["friendslist"]["friends"]: people.append(f'{friend["steamid"]}') except Exception as e: print("[_parseFriendsList] Error parsing friendsdict ({e})") people.clear() return people def basicscan(self, steamid64: str) -> dict: return {steamid64: self._parseFriendsList(self._getFriendsList(steamid64))}