1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
|
import requests
import json
import time
from steamrelationships.constants import _Const
CONST = _Const()
class SteamRelationships:
session: object = requests.Session() # Session object so repeated querries to steam's api use the same TCP connection
scanlist: list = [] # To be populated by recurse()
reqjson: str = "requests.json"
def __init__(self, webapikey: str, timeout: int = 30, reqdelay: float = 0.5, delayrand: float = 1.25, reqsafetybuffer: float = 0.9, reqjson: str = "requests.json") -> None:
"""
(str/int) webapikey - Steam dev api key required to use Steam's ISteamUser/GetFriendList interface
Request Options:
(int/float) timeout (Default: 30) - Seconds to wait before timing out a request.
(float) reqdelay (Default: 0.5) - Default amount of seconds to wait between sending each request to Steam
(float) delayrand (Default: 1.25) - The max percentage of extra delay to randomly add to each request (1 = no randomness, <1 = randomly shorter, >1 = randomly longer)
(float) reqsafetybuffer (Default: 0.9) - Highest percent of Steam's API request limit you are willing to run
> Steam has an API request limit of 100,000 requests per day.
reqsafetybuffer = 0.9 means SteamRequests will send a max of 90,000 requests per day.
Entering a number higher than 1 will not result in sending more than 100,000 requests per day
(str) reqjson (Default: "requests.json") - The name/filepath of the file to store request times in
""" # I can't make this a formatted string :sob:
self.webapikey = webapikey
self.timeout = timeout
self.reqdelay = reqdelay
self.delayrand = delayrand
self.reqsafetybuffer = reqsafetybuffer
self.reqjson = reqjson
def _readjsonfile(self, filepath: str = "") -> dict:
if not filepath:
filepath = self.reqjson
final: dict = {}
# Try to read the contents of the given file
try:
with open(filepath, "r+") as jsonfile:
final = json.load(jsonfile)
jsonfile.close()
# If the file does not exist, create one and slap an empty list in it
except FileNotFoundError:
print(f"File {filepath} does not exist. Generating empty json file...")
try:
with open(filepath, "w+") as newfile:
json.dump({time.time_ns(): [0, []]}, newfile, indent=4)
newfile.close()
return self._readjsonfile(filepath)
except:
print("Couldn't create new file")
return {}
except:
print("Other unknown error occured")
return {}
return final
def _checkrequests(self, filename: str = "") -> int:
if not filename:
filename = self.reqjson
# Get the contents of the specified file
jsoncontents: dict = {}
try:
jsoncontents = self._readjsonfile(filename)
except:
print(f"Could not get the contents of file {filename}")
return -1
# Check the current date. If over 1 day since last entry, add a new entry. Otherwise, edit the current day's entry [note, 1 day in nanoseconds = (8.64 * (10 ** 13)) ]
checktime = time.time_ns()
if (checktime - int(list(jsoncontents.keys())[-1])) > (8.64 * (10 ** 13)):
jsoncontents[checktime] = [0, []]
else:
# This bullshit brought to you by: ordered dictionaries
currentreqs = jsoncontents[list(jsoncontents.keys())[-1]][0]
if currentreqs < (CONST.STEAMAPI_MAXREQ * self.reqsafetybuffer) and currentreqs < CONST.STEAMAPI_MAXREQ:
jsoncontents[list(jsoncontents.keys())[-1]][0] += 1
jsoncontents[list(jsoncontents.keys())[-1]][1].append(checktime)
else:
print(f"Daily request limit reached ({jsoncontents[list(jsoncontents.keys())[-1]][0]}/{CONST.STEAMAPI_MAXREQ * self.reqsafetybuffer}). Please try again tomorrow, or increase \"reqsafetybuffer\" (currently: {self.reqsafetybuffer})")
return 0
# Update the json file
try:
with open(filename, "w+t") as jsonfile:
json.dump(jsoncontents, jsonfile, indent=4)
jsonfile.close()
except:
print("Could not update json file")
return -1
return jsoncontents[list(jsoncontents.keys())[-1]][0]
def _getFriendsList(self, steamid64: str = None) -> dict:
# example url: http://api.steampowered.com/ISteamUser/GetFriendList/v0001/?key=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX&steamid=76561197960435530&relationship=friend
if not steamid64:
print("Requested id must not be blank")
return {}
# Format url and make a request
url = "https://api.steampowered.com/ISteamUser/GetFriendList/v0001/"
options = {"key": self.webapikey, "steamid": steamid64, "relationship": "friend"}
# BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING #
# BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING #
# BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING BIG WARNING #
# TODO: FIX THIS SO IT DOESN"T EXPLODE EVERYTHING ALL THE TIME
response: object
if self._checkrequests() > 0:
response = self.session.get(url, params=options, timeout=self.timeout) # GET should be as secure as POST because ssl is being used
else:
return None
# TODO: Implement proper error checking so that this doesn't just break if someone has a private friends list
if response.status_code == requests.codes.ok:
return response.json()
return None
def parseFriendsList(self, friendslist: dict = None) -> list:
if not friendslist:
return None
final = []
for friend in friendslist['friendslist']['friends']:
final.append(friend['steamid'])
return final
# Do a basic scan for a single id
def basic_scan(self, id: str) -> list:
return self.parseFriendsList(self._getFriendsList(id))
def recursive_scan(self, startid: str, ):
scans: dict = {}
alreadyscanned: list = []
scans[startid] = self.basic_scan(startid)
alreadyscanned.append(startid)
tempscans: dict = {}
for person in scans:
for friend in scans[person]:
if friend not in alreadyscanned:
tempscans[friend] = self.basic_scan(tempscans)
alreadyscanned.append(friend)
scans.update(tempscans)
return
|