import requests import json import time import random from steamrelationships.constants import _Const CONST = _Const() class SteamRelationships: session: object = requests.Session() # Session object so repeated querries to steam's api use the same TCP connection scanlist: dict = {} # To be populated by recursivescan() def __init__(self, webapikey: str, timeout: int = 30, timeout_retries: int = 5, reqdelay: float = 0.5, delayrand: int = 25, reqsafetybuffer: float = 0.9, reqjson: str = "requests.json") -> None: """ """ # I can't make this a formatted string :sob: # This used to be in a try-except block, but I decided that it should error out on a proper error self.webapikey: str = str(webapikey) self.timeout: int = int(abs(timeout)) self.timeout_retries: int = int(abs(timeout_retries)) self.reqdelay: float = float(abs(reqdelay)) self.delayrand: int = int(abs(delayrand) % 100) self.reqsafetybuffer: float = float(abs(reqsafetybuffer)) self.reqjson: str = str(reqjson) def _readjsonfile(self, filepath: str = "") -> dict: if not filepath: filepath = self.reqjson final: dict = {} # Try to read the contents of the given file try: with open(filepath, "r+") as jsonfile: final = json.load(jsonfile) jsonfile.close() # If the file does not exist, create one and slap an empty list in it except FileNotFoundError: print(f"[_readjsonfile] File {filepath} does not exist. Generating empty json file...") try: with open(filepath, "w+") as newfile: json.dump({time.time_ns(): [0, []]}, newfile, indent=CONST.JSON_INDENT) newfile.close() return self._readjsonfile(filepath) except Exception as e: print(f"[_readjsonfile] Couldn't create new file ({e})") return {} except Exception as e: print(f"[_readjsonfile] Other unknown error occured ({e})") return {} return final def _checkrequests(self, filename: str = "") -> int: if not filename: filename = self.reqjson # Get the contents of the specified file jsoncontents: dict = {} try: jsoncontents = self._readjsonfile(filename) except Exception as e: print(f"[_checkrequests] Could not get the contents of file \"{filename}\" ({e})") return -1 # Check the current date. If over 1 day since last entry, add a new entry. Otherwise, edit the current day's entry [note, 1 day in nanoseconds = (8.64 * (10 ** 13)) ] checktime = time.time_ns() if (checktime - int(list(jsoncontents.keys())[-1])) > CONST.DAY_IN_NANO: jsoncontents[checktime] = [0, []] else: # This bullshit brought to you by: ordered dictionaries currentreqs = jsoncontents[list(jsoncontents.keys())[-1]][0] if currentreqs < (CONST.STEAMAPI_MAXREQ * self.reqsafetybuffer) and currentreqs < CONST.STEAMAPI_MAXREQ: jsoncontents[list(jsoncontents.keys())[-1]][0] += 1 jsoncontents[list(jsoncontents.keys())[-1]][1].append(checktime) else: print(f"[_checkrequests] Daily request limit reached ({currentreqs}/{CONST.STEAMAPI_MAXREQ * self.reqsafetybuffer}). Please try again tomorrow, or increase \"reqsafetybuffer\" (currently: {self.reqsafetybuffer})") return 0 # Update the json file try: with open(filename, "w+t") as jsonfile: json.dump(jsoncontents, jsonfile, indent=CONST.JSON_INDENT) jsonfile.close() except Exception as e: print(f"[_checkrequests] Could not update json file ({e})") return -1 return jsoncontents[list(jsoncontents.keys())[-1]][0] def _getFriendsList(self, steamid64: str, retrys: int = 0) -> dict: if not steamid64: print("[_getFriendsList] No steamid64 given") return {} # Make sure we haven't gone over steam's daily max if self._checkrequests() == 0: print("[_getFriendsList] Max requests reached, refusing to contact steam") return {} url: str = "https://api.steampowered.com/ISteamUser/GetFriendList/v0001/" options: dict = {"key": self.webapikey, "steamid": steamid64, "format": "json"} result: object = None # Contact steam try: result = self.session.get(url, params=options, timeout=self.timeout) except requests.exceptions.Timeout: print(f"[_getFriendsList] Request timed out (No response for {self.timeout} seconds)") if retrys <= self.timeout_retries: print(f"[_getFriendsList] Retrying request... (Attempt {retrys}/{self.timeout_retries})") return self._getFriendsList(steamid64, retrys + 1) print("[_getFriendsList] Retry limit reached") return {} except Exception as e: print(f"[_getFriendsList] Other error in contacting steam ({e})") return {} # Error out on request error if result.status_code != requests.codes.ok: print(f"[_getFriendsList] Got bad status code (Requested id: {steamid64}, status: {result.status_code})", end="") if result.status_code == 401: # Steam returns a 401 on profiles with private friends lists print(f". It is likely that the requested id has a private profile / private friends list", end="") print("\n", end="") return {} # Get the json contents from the response resultjson: dict = {} try: resultjson = result.json() except requests.exceptions.JSONDecodeError: print("[_getFriendsList] Could not decode json response for some reason") return {} except Exception as e: print(f"[_getFriendsList] Unknown error in getting json response ({e})") return {} return resultjson def _parseFriendsList(self, friendsdict: dict) -> list: if not friendsdict: print("[_parseFriendsList] Empty friends dict given") return [] people: list = [] try: for friend in friendsdict["friendslist"]["friends"]: people.append(f'{friend["steamid"]}') except Exception as e: print("[_parseFriendsList] Error parsing friendsdict ({e})") people.clear() return people def basicscan(self, steamid64: str) -> dict: ''' basicscan - do a basic scan of someone's steam friends PARAMS: (str) steamid64 - The 64 bit steam id of the user you want to scan RETURN VALUES: (dict) EMPTY - There was an error scanning the user's friends list (dict) {steamid64: [friendID1, friendID2, ...]} - A dict with a single key, that of the scanned user, which maps to a list of the users's friends ''' return {steamid64: self._parseFriendsList(self._getFriendsList(steamid64))} def recursivescan(self, steamid64: str, recurselevel: int = 2) -> dict: ''' recursivescan - Scan a user's friends list, then scan their friends as well PARAMS: (str) steamid64 - The starting user to scan (int) recurselevel - The number of recursive scans to complete RETURN VALUES: (dict) EMPTY - Some catastrophic error has occured and no scan could be started (dict) { steamid64: [friend1id, friend2id, friend3id, ...], friend1id: [other_friend, other_friend, ...], friend2id: [other_friend, other_friend, ...], ... } - A dict containing the starting steamid, then the friends contained in the original scan with the results of their scan NOTE: Please do not use a value greater than 3. While theoretically any value works, due to the exponential nature of friendship relations, you will very quickly spam Steam with tens of thousands of requests. If this concept is unfamiliar to you, please take a quick glance at the Wikipedia page for "six degress of separation": https://en.wikipedia.org/wiki/Six_degrees_of_separation TLDR: recursivescan is exponential and you will reach 100,000 requests very quickly if you recurse greater than 3 ''' testlist: dict = self.basicscan(steamid64) alreadyscanned: list = [steamid64] for i in range(recurselevel): tempdict: dict = {} for person in testlist: for friend in testlist[person]: if friend not in alreadyscanned: tempdict.update(self.basicscan(friend)) alreadyscanned.append(friend) sleepytime: float = abs(random.randrange(100 - self.delayrand, 100 + self.delayrand) / 100 * self.reqdelay) time.sleep(sleepytime) testlist.update(tempdict) tempdict.clear() self.scanlist.update(testlist) return testlist