import requests import json import time from steamrelationships.constants import _Const CONST = _Const() class SteamRelationships: session: object = requests.Session() # Session object so repeated querries to steam's api use the same TCP connection scanlist: list = [] # To be populated by recurse() reqjson: str = "requests.json" def __init__(self, webapikey: str, timeout: int = 30, reqdelay: float = 0.5, delayrand: float = 1.25, reqsafetybuffer: float = 0.9, reqjson: str = "requests.json") -> None: """ (str/int) webapikey - Steam dev api key required to use Steam's ISteamUser/GetFriendList interface Request Options: (int/float) timeout (Default: 30) - Seconds to wait before timing out a request. (float) reqdelay (Default: 0.5) - Default amount of seconds to wait between sending each request to Steam (float) delayrand (Default: 1.25) - The max percentage of extra delay to randomly add to each request (1 = no randomness, <1 = randomly shorter, >1 = randomly longer) (float) reqsafetybuffer (Default: 0.9) - Highest percent of Steam's API request limit you are willing to run > Steam has an API request limit of 100,000 requests per day. reqsafetybuffer = 0.9 means SteamRequests will send a max of 90,000 requests per day. Entering a number higher than 1 will not result in sending more than 100,000 requests per day (str) reqjson (Default: "requests.json") - The name/filepath of the file to store request times in """ # I can't make this a formatted string :sob: self.webapikey = webapikey self.timeout = timeout self.reqdelay = reqdelay self.delayrand = delayrand self.reqsafetybuffer = reqsafetybuffer self.reqjson = reqjson def _readjsonfile(self, filepath: str = "") -> dict: if not filepath: filepath = self.reqjson final: dict = {} # Try to read the contents of the given file try: with open(filepath, "r+") as jsonfile: final = json.load(jsonfile) jsonfile.close() # If the file does not exist, create one and slap an empty list in it except FileNotFoundError: print(f"File {filepath} does not exist. Generating empty json file...") try: with open(filepath, "w+") as newfile: json.dump({time.time_ns(): [0, []]}, newfile, indent=4) newfile.close() return self._readjsonfile(filepath) except: print("Couldn't create new file") return {} except: print("Other unknown error occured") return {} return final def _checkrequests(self, filename: str = "") -> int: if not filename: filename = self.reqjson # Get the contents of the specified file jsoncontents: dict = {} try: jsoncontents = self._readjsonfile(filename) except: print(f"Could not get the contents of file {filename}") return -1 # Check the current date. If over 1 day since last entry, add a new entry. Otherwise, edit the current day's entry [note, 1 day in nanoseconds = (8.64 * (10 ** 13)) ] checktime = time.time_ns() if (checktime - int(list(jsoncontents.keys())[-1])) > (8.64 * (10 ** 13)): jsoncontents[checktime] = [0, []] else: # This bullshit brought to you by: ordered dictionaries currentreqs = jsoncontents[list(jsoncontents.keys())[-1]][0] if currentreqs < (CONST.STEAMAPI_MAXREQ * self.reqsafetybuffer) and currentreqs < CONST.STEAMAPI_MAXREQ: jsoncontents[list(jsoncontents.keys())[-1]][0] += 1 jsoncontents[list(jsoncontents.keys())[-1]][1].append(checktime) else: print(f"Daily request limit reached ({jsoncontents[list(jsoncontents.keys())[-1]][0]}/{CONST.STEAMAPI_MAXREQ * self.reqsafetybuffer}). Please try again tomorrow, or increase \"reqsafetybuffer\" (currently: {self.reqsafetybuffer})") return 0 # Update the json file try: with open(filename, "w+t") as jsonfile: json.dump(jsoncontents, jsonfile, indent=4) jsonfile.close() except: print("Could not update json file") return -1 return jsoncontents[list(jsoncontents.keys())[-1]][0] def _getFriendsList(self, steamid64: str = None) -> dict: # example url: http://api.steampowered.com/ISteamUser/GetFriendList/v0001/?key=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX&steamid=76561197960435530&relationship=friend if not steamid64: print("Requested id must not be blank") return # Format url and make a request url = "https://api.steampowered.com/ISteamUser/GetFriendList/v0001/" options = {"key": self.webapikey, "steamid": steamid64, "relationship": "friend"} # BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING # # BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING # # BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING # # TODO: FIX THIS SO IT DOESN"T EXPLODE EVERYTHING ALL THE TIME if self._checkrequests() > 0: response = self.session.get(url, params=options, timeout=self.timeout) # GET should be as secure as POST because ssl is being used else: return None # TODO: Implement proper error checking so that this doesn't just break if someone has a private friends list if response.status_code == requests.codes.ok: return response.json() return None def parseFriendsList(self, friendslist: dict = None) -> list: if not friendslist: return None final = [] for friend in friendslist['friendslist']['friends']: final.append(friend['steamid']) return final def basic_scan(self, startid: str = None) -> list: # Scan an initial id, then populate a list with the scans of each friend scans: dict = {} alreadyscanned: list = [] # Start the scan and collect the first user's friend list scans[startid] = self.parseFriendsList(self._getFriendsList(startid)) alreadyscanned.append(startid) ''' # Scan the current scanid's friends and append them to the list for friend in scans[startid]: if friend not in alreadyscanned: scans[friend] = self.parseFriendsList(self._getFriendsList(friend)) alreadyscanned.append(friend) ''' return scans