import requests import json import time from steamrelationships.constants import _Const CONST = _Const() class SteamRelationships: session: object = requests.Session() # Session object so repeated querries to steam's api use the same TCP connection scanlist: dict = {} # To be populated by recursivescan() reqjson: str = "requests.json" # Path to the JSON file that handles requests def __init__(self, webapikey: str, timeout: int = 30, timeout_retries: int = 5, reqdelay: float = 0.5, delayrand: float = 1.25, reqsafetybuffer: float = 0.9, reqjson: str = "requests.json") -> None: """ (str/int) webapikey - Steam dev api key required to use Steam's ISteamUser/GetFriendList interface Request Options: (int) timeout (Default: 30) - Seconds to wait before timing out a request. (int) timeout_retries (Default: 5) Number of times to retry a request after timing out (float) reqdelay (Default: 0.5) - Default amount of seconds to wait between sending each request to Steam (float) delayrand (Default: 1.25) - The max percentage of extra delay to randomly add to each request (1 = no randomness, <1 = randomly shorter, >1 = randomly longer) (float) reqsafetybuffer (Default: 0.9) - Highest percent of Steam's API request limit you are willing to run > Steam has an API request limit of 100,000 requests per day. reqsafetybuffer = 0.9 means SteamRequests will send a max of 90,000 requests per day. Entering a number higher than 1 will not result in sending more than 100,000 requests per day (str) reqjson (Default: "requests.json") - The name/filepath of the file to store request times in """ # I can't make this a formatted string :sob: self.webapikey = webapikey self.timeout = timeout self.timeout_retries = timeout_retries self.reqdelay = reqdelay self.delayrand = delayrand self.reqsafetybuffer = reqsafetybuffer self.reqjson = reqjson def _readjsonfile(self, filepath: str = "") -> dict: if not filepath: filepath = self.reqjson final: dict = {} # Try to read the contents of the given file try: with open(filepath, "r+") as jsonfile: final = json.load(jsonfile) jsonfile.close() # If the file does not exist, create one and slap an empty list in it except FileNotFoundError: print(f"[_readjsonfile] File {filepath} does not exist. Generating empty json file...") try: with open(filepath, "w+") as newfile: json.dump({time.time_ns(): [0, []]}, newfile, indent=CONST.JSON_INDENT) newfile.close() return self._readjsonfile(filepath) except Exception as e: print(f"[_readjsonfile] Couldn't create new file ({e})") return {} except Exception as e: print(f"[_readjsonfile] Other unknown error occured ({e})") return {} return final def _checkrequests(self, filename: str = "") -> int: if not filename: filename = self.reqjson # Get the contents of the specified file jsoncontents: dict = {} try: jsoncontents = self._readjsonfile(filename) except Exception as e: print(f"[_checkrequests] Could not get the contents of file {filename} ({e})") return -1 # Check the current date. If over 1 day since last entry, add a new entry. Otherwise, edit the current day's entry [note, 1 day in nanoseconds = (8.64 * (10 ** 13)) ] checktime = time.time_ns() if (checktime - int(list(jsoncontents.keys())[-1])) > CONST.DAY_IN_NANO: jsoncontents[checktime] = [0, []] else: # This bullshit brought to you by: ordered dictionaries currentreqs = jsoncontents[list(jsoncontents.keys())[-1]][0] if currentreqs < (CONST.STEAMAPI_MAXREQ * self.reqsafetybuffer) and currentreqs < CONST.STEAMAPI_MAXREQ: jsoncontents[list(jsoncontents.keys())[-1]][0] += 1 jsoncontents[list(jsoncontents.keys())[-1]][1].append(checktime) else: print(f"[_checkrequests] Daily request limit reached ({jsoncontents[list(jsoncontents.keys())[-1]][0]}/{CONST.STEAMAPI_MAXREQ * self.reqsafetybuffer}). Please try again tomorrow, or increase \"reqsafetybuffer\" (currently: {self.reqsafetybuffer})") return 0 # Update the json file try: with open(filename, "w+t") as jsonfile: json.dump(jsoncontents, jsonfile, indent=CONST.JSON_INDENT) jsonfile.close() except Exception as e: print("[_checkrequests] Could not update json file ({e})") return -1 return jsoncontents[list(jsoncontents.keys())[-1]][0] def _getFriendsList(self, steamid64: str, retrys: int = 0) -> dict: if not steamid64: print("[_getFriendsList] No steamid64 given") return {} # Make sure we haven't gone over steam's daily max if self._checkrequests() == 0: print("[_getFriendsList] Max requests reached, refusing to contact steam") return {} url: str = "https://api.steampowered.com/ISteamUser/GetFriendList/v0001/" options: dict = {"key": self.webapikey, "steamid": steamid64, "format": "json"} result: object = None # Contact steam try: result = self.session.get(url, params=options, timeout=self.timeout) except requests.exceptions.Timeout: print(f"[_getFriendsList] Request timed out (No response for {self.timeout} seconds)") if retrys <= self.timeout_retries: print(f"[_getFriendsList] Retrying request... (Attempt {retrys}/{self.timeout_retries})") return self._getFriendsList(steamid64, retrys + 1) print("[_getFriendsList] Retry limit reached") return {} except Exception as e: print(f"[_getFriendsList] Other error in contacting steam ({e})") return {} # Error out on request error if result.status_code != requests.codes.ok: print(f"[_getFriendsList] Got bad status code (Requested id: {steamid64}, status: {result.status_code})") return {} # Get the json contents from the response resultjson: dict = {} try: resultjson = result.json() except requests.exceptions.JSONDecodeError: print("[_getFriendsList] Could not decode json response for some reason") return {} except Exception as e: print(f"[_getFriendsList] Unknown error in getting json response ({e})") return {} return resultjson def _parseFriendsList(self, friendsdict: dict) -> list: if not friendsdict: print("[_parseFriendsList] Empty friends dict given") return [] people: list = [] try: for friend in friendsdict["friendslist"]["friends"]: people.append(f'{friend["steamid"]}') except Exception as e: print("[_parseFriendsList] Error parsing friendsdict ({e})") people.clear() return people def basicscan(self, steamid64: str) -> dict: ''' basicscan - do a basic scan of someone's steam friends PARAMS: (str) steamid64 - The 64 bit steam id of the user you want to scan RETURN VALUES: (dict) EMPTY - There was an error scanning the user's friends list (dict) {steamid64: [friendID1, friendID2, ...]} - A dict with a single key, that of the scanned user, which maps to a list of the users's friends ''' return {steamid64: self._parseFriendsList(self._getFriendsList(steamid64))} def recursivescan(self, steamid64: str, recurselevel: int = 2) -> dict: ''' recursivescan - Scan a user's friends list, then scan their friends as well PARAMS: (str) steamid64 - The starting user to scan (int) recurselevel - The number of recursive scans to complete RETURN VALUES: (dict) EMPTY - Some catastrophic error has occured and no scan could be started (dict) { steamid64: [friend1id, friend2id, friend3id, ...], friend1id: [other_friend, other_friend, ...], friend2id: [other_friend, other_friend, ...], ... } - A dict containing the starting steamid, then the friends contained in the original scan with the results of their scan NOTE: Please do not use a value greater than 3. While theoretically any value works, due to the exponential nature of friendship relations, you will very quickly spam Steam with tens of thousands of requests. If this concept is unfamiliar to you, please take a quick glance at the Wikipedia page for "six degress of separation": https://en.wikipedia.org/wiki/Six_degrees_of_separation ''' testlist: dict = self.basicscan(steamid64) alreadyscanned: list = [steamid64] for i in range(recurselevel): tempdict: dict = {} for person in testlist: for friend in testlist[person]: if friend not in alreadyscanned: tempdict.update(self.basicscan(friend)) alreadyscanned.append(friend) testlist.update(tempdict) tempdict.clear() self.scanlist.update(testlist) return testlist