程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python crawler series little red book account authorization automatically publishes note videos

編輯:Python

Python Crawler series' little red book account authorization automatically publishes note videos

If you have any questions Click here to contact us <

Please scan the QR code below for wechat

The code is for learning and communication only , Do not use for illegal purposes

Go straight to the code

# -*- coding:utf-8 -*-
import requests
import json
import random
import os
import cv2
from PIL import Image
import time
import configparser
import shutil
import MySQLdb
from RedisUtils import RedisUtils
import threading
from util import getNoteSign,getPostSign,getXSign,getTraceid
retry = 3
timeout = 20
r = RedisUtils()
tmpFilePath = os.getcwd() + "/tmp/"
if not os.path.exists(tmpFilePath):
os.mkdir(tmpFilePath)
cf = configparser.ConfigParser()
try:
cf.read(os.getcwd() + "/conf.ini", encoding="utf-8-sig")
except Exception as e:
print(" Program directory does not exist conf.ini The configuration file ~")
exit(0)
def getConf(sec, key):
try:
return cf.get(sec, key)
except Exception as e:
print(e)
print(" The following configuration is not available :" + sec + " - " + key)
exit(0)
def execSql(sql):
'''
perform sql to update
:param sql:
:return:
'''
try:
conn = MySQLdb.connect(user=mysql_user, host="127.0.0.1", password=mysql_password, database=mysql_database, charset='utf8')
cursor = conn.cursor()
cursor.execute(sql)
conn.commit()
conn.close()
return True
except Exception as e:
return False
def querySql(sql):
'''
perform sql Inquire about
:param sql:
:return:
'''
try:
conn = MySQLdb.connect(user=mysql_user, host="127.0.0.1", password=mysql_password, database=mysql_database, charset='utf8')
cursor = conn.cursor()
cursor.execute(sql)
conn.close()
return cursor.fetchall()
except Exception as e:
return None
def queryUserInfoService(appkey, appsecret):
sql = "select a.appkey, a.register_time, b.nums, b.expired_ts from ims_channel_user a inner join ims_channel_allowance b on a.appkey = b.appkey where a.appkey = '%s' and a.use_flag = '1' and a.appsecret = '%s'" % (appkey, appsecret)
res = querySql(sql)
try:
return {
"appkey": res[0][0], "register_time": res[0][1], "nums": res[0][2], "expired_ts": res[0][3]}
except Exception as e:
pass
return
def getHtml(url, headers=None):
for i in range(retry):
try:
if headers is None:
headers = header
res = requests.get(url, headers=headers, timeout=timeout)
return res.json()
except Exception as e:
pass
def postHtml(url, data, headers=None):
for i in range(retry):
try:
if headers is None:
headers = header
res = requests.post(url, data=data, headers=headers, timeout=timeout)
return res.json()
except Exception as e:
pass
def getQr(s):
while True:
try:
resp = os.popen('node qr.js ' + str(s))
if platform.system().lower() == 'windows':
return resp.buffer.read()
elif platform.system().lower() == 'linux':
return resp.buffer.read().decode("utf-8")
except Exception as e:
pass
header = {

"authorization": "",
"content-type": "application/json;charset=UTF-8",
"origin": "https://creator.*******.com",
"referer": "https://creator.*******.com/",
"sec-ch-ua": '"Google Chrome";v="89", "Chromium";v="89", ";Not A Brand";v="99"',
"sec-ch-ua-mobile": "?0",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36",
"x-b3-traceid": getTraceid(),
}
def getVideoImg(path):
try:
vidcap = cv2.VideoCapture(path)
success, image = vidcap.read()
n = 1
while n < 30:
success, image = vidcap.read()
n += 1
imgPath = tmpFilePath + '123.jpg'
# imag = cv2.imwrite(imgPath, image)
cv2.imencode(".jpg", image)[1].tofile(imgPath)
im = Image.open(imgPath)
return imgPath, im.size[0], im.size[1]
except Exception as e:
return None, None, None
def doPublish(title, content, userId, cover, video, cookie, logid):
traceId = getTraceid()
xsign = getXSign()
header = {

"content-type": "application/json;charset=UTF-8",
"cookie": cookie,
"origin": "https://creator.*******.com",
"referer": "https://creator.*******.com/",
"sec-ch-ua": '"Google Chrome";v="89", "Chromium";v="89", ";Not A Brand";v="99"',
"sec-ch-ua-mobile": "?0",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36",
"x-b3-traceid": getTraceid(),
}
header['x-sign'] = xsign
url = "https://www.*******.com/fe_api/burdock/?traceId=" + traceId
data = {

"cover": cover,
"video": video,
"title": title,
"content": content,
"topics": [],
"noteSign": getNoteSign(title, content, userId),
"postSign": getPostSign(userId),
"deviceFingerprint": ""
}
res = requests.post(url, headers=header, data=json.dumps(data))
try:
res = res.json()
if str(res['code']) == "0":
updateLogIdStatus(logid, " Successful release ")
else:
updateLogIdStatus(logid, " Publish exception , Abnormal prompt :%s " % str(res))
except Exception as e:
updateLogIdStatus(logid, " Publish exception , Abnormal prompt :%s " % str(res))
def uploadVideo(videoPath):
uploadParams = getVideoUploadParams()
if uploadParams and len(uploadParams) > 0:
url = "https://upload.qiniup.com/"
files = {

"token": (None, uploadParams['token']),
"key": (None, uploadParams['key']),
"fname": (None, os.path.basename(videoPath)),
}
header = {

"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36",
}
res = requests.post(url, headers=header)
try:
res = res.json()
return {
"vkey": res['key'], "fsize": res['fsize']}
except Exception as e:
pass
def uploadImg(videoPath):
imgs = getVideoImg(videoPath)
uploadParams = getImgUploadParams()
if uploadParams and len(uploadParams) > 0:
url = "https://xhsci-10008268.cos.accelerate.myqcloud.com/"
files = {

"Signature": (None, uploadParams['token']),
"x-cos-security-token": (None, ),
"key": (None, uploadParams['key']),
"file": ("cover.jpeg", open(imgs[0], "rb"), "application/octet-stream", {
}),
}
header = {

"Host": "xhsci-10008268.cos.accelerate.myqcloud.com",
"Origin": "https://creator.*******.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36",
}
res = requests.post(url, headers=header)
return str(res.status_code) == "204", {
"width": int(imgs[1]), "height": int(imgs[2]), "fileid": uploadParams['key']}
else:
return False, None
def checkFileExists(path):
try:
return os.path.exists(path)
except Exception as e:
return False
def deleteFile(path):
try:
os.remove(path)
except Exception as e:
pass
def checkCreateStatusService(logid):
try:
res = r.get("xhs_log_" + logid)
if res is None:
return False, " no logid by :%s Release progress information for !" % logid
if " Successful release " in res:
return True, res
return False, res
except Exception as e:
return False, " no logid by :%s Release progress information for !" % logid
def uploadVideoSubmiter(a, title, content, videoPath, cookie, logid):
userId = getUserId(cookie)
if userId is None:
updateLogIdStatus(logid, " Failed to get user information , Please check if your login has expired ")
return
if imgStatus:
updateLogIdStatus(logid, " Uploading video ")
if videoInfo is None or len(videoInfo) == 0:
updateLogIdStatus(logid, " Video upload failed , Please try again ")
video = {

"width": cover['width'],
"height": cover['height'],
"fileid": videoInfo['vkey'],
"fsize": videoInfo['fsize'],
"duration": 0,
"frame": {

"ts": 0,
"userSelect": False
}
}
return doPublish(title, content, userId, cover)
else:
updateLogIdStatus(logid, " Failed to upload the cover image , Please try again ")
def uploadVideoService(account, title, content, videoPath):
user = {
}
try:
user = eval(r.get("xhs_" + account))
if len(user) == 0:
return False, " No account :%s" % account
except Exception as e:
return False, " No account :%s" % account
if not checkFileExists(videoPath):
return False, " There is no video :%s" % videoPath
logid = getHash(str(account) + "-" + str(time.time()))
updateLogIdStatus(logid, " Successfully submitted the video publishing task ")
threading.Thread(target=uploadVideoSubmiter, args=(None, title, content, videoPath, user['cookie'], logid)).start()
return True, logid
def getLoginToken():
url = "https://www.*******.com/fe_api/burdock/web/v2/login/by_qrcode/token/generate"
header = {

"authorization": "",
"origin": "https://creator.*******.com",
"referer": "https://creator.*******.com/",
"x-b3-traceid": getRandStr(16),
}
header['x-sign'] = getXSign()
res = postHtml(url, "", header)
try:
return res['data']['token']
except Exception as e:
pass
def checkLoginStatus(a, token, account, notifyurl):
url = "https://www.*******.com/fe_api/burdockby_qrcode/token/validate"
header = {

"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36",
}
header['x-sign'] = getXSign()
data = {

"token": token,
"device_fingerprint": "deviceId"
}
startTs = int(time.time())
while True:
resp = requests.post(url, data=json.dumps(data), headers=header, timeout=timeout)
res = resp.json()
if str(res['code']) == "0" and str(res['data']['status']) == "0":
try:
key, value = cookie.split("=")
if "xhs-session" == key:
cookieS += "xhs-session=" + value + "; "
elif "xhs-session.sig" in key:
cookieS += "xhs-session.sig=" + value + "; "
except Exception as e:
pass
shutil.copyfile("success.png", os.getcwd() + "/tmp/" + str(account) + ".png")
return
currTs = int(time.time())
if currTs - startTs >= 60:
break
time.sleep(1)
def doNotify(notifyurl, account):
url = notifyurl + "&spn=" + account
getHtml(url)
def loginService(account, notifyurl):
qrCodePath = tmpFilePath + account + ".png"
deleteFile(qrCodePath)
token = getLoginToken1()
if token is None:
return False, " Failed to get QR code "
threading.Thread(target=checkLoginStatus, args=(None, token, account, notifyurl)).start()
return status, apiHost + "/tmp/" + account + ".png"
if __name__ == '__main__':
main()

  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved