David 借用楼主的源代码,用通义千问改了一下,这是云函数的版本:
请求处理程序填:main.handler
本人非计算机专业的,只是本着能直接用想法,各位大佬手下留情
已测试可用
import json
import requests
import re
def get_cookies():
try:
with open("data.json", "r") as f:
data = json.load(f)
return data["cookies"]
except Exception as e:
return {"error": str(e)}
def get_token(session, cookies):
url = "https://invites.fun/"
response = session.get(url, cookies=cookies)
html_content = response.text
csrf_token_pattern = r'"csrfToken":"([a-zA-Z0-9]+)"'
user_id_pattern = r'"userId":([a-zA-Z0-9]+)'
match1 = re.search(user_id_pattern, html_content)
match2 = re.search(csrf_token_pattern, html_content)
if match1 is None:
return {"error": "User ID not found."}, None
if match2:
csrf_token = match2.group(1)
user_id = match1.group(1)
return {"csrf_token": csrf_token, "user_id": user_id}, None
else:
return None, {"error": "CSRF Token not found."}
def sign_in_invites(session, cookies):
token_info, error = get_token(session, cookies)
if error:
return error
csrf_token = token_info['csrf_token']
user_id = token_info['user_id']
check_in_url = f"https://invites.fun/api/users/{user_id}"
headers = {
"authority": "invites.fun",
"method": "POST",
"path": f"/api/users/{user_id}",
"scheme": "https",
"accept": "*/*",
"accept-encoding": "gzip, deflate, br, zstd",
"accept-language": "zh-CN,zh;q=0.9",
"content-type": "application/json; charset=UTF-8",
"cookie": f"flarum_remember={cookies['flarum_remember']}; flarum_session={cookies['flarum_session']}",
"dnt": "1",
"origin": "https://invites.fun",
"referer": "https://invites.fun/",
"sec-ch-ua": "\"Google Chrome\";v=\"129\", \"Not=A?Brand\";v=\"8\", \"Chromium\";v=\"129\"",
"sec-ch-ua-m(๐•ᴗ•๐)ile": "?1",
"sec-ch-ua-platform": "\"Android\"",
"sec-fetch-dest": "(๐•ᴗ•๐)ty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"sec-gpc": "1",
"user-agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 M(๐•ᴗ•๐)ile Safari/537.36",
"x-csrf-token": csrf_token,
"x-http-method-override": "PATCH"
}
data = {
"data": {
"type": "users",
"attributes": {
"canCheckin": False,
"totalContinuousCheckIn": 1
},
"id": user_id
}
}
response = session.post(check_in_url, json=data, headers=headers)
if response.status_code == 200:
result = response.json()
can_checkin = result['data']['attributes'].get('canCheckin', None)
total_checkins = result['data']['attributes'].get('totalContinuousCheckIn', None)
money = result['data']['attributes'].get('money', None)
if can_checkin is False:
return {"message": f"签到成功,连续签到次数:{total_checkins},药丸数量:{money}"}
else:
return {"message": "签到失败或已经签到过了"}
else:
return {"error": f"请求失败,状态码: {response.status_code}"}
def handler(event, context):
# 创建会话
session = requests.Session()
# 获取cookies
cookies = get_cookies()
if 'error' in cookies:
return {"error": cookies['error']}, 500
# 执行签到操作
result = sign_in_invites(session, cookies)
if 'error' in result:
return {"error": result['error']}, 500
else:
return {"message": result['message']}, 200
if(window.hljsLoader && !document.currentScr(๐•ᴗ•๐).parentNode.hasAttribute('data-s9e-livepreview-onupdate')) {
window.hljsLoader.highlightBlocks(document.currentScr(๐•ᴗ•๐).parentNode);
}