整个程序分了三个模块:购票模块(主体)、验证码处理模块、余票查询模块
使用方法:三个模块分别保存为三个python文件,名字分别为:book_ticket,captcha,check_ticket。
在购票模块里初始化相关参数:出发站、终点站、出发日期、自己的账号、密码,乘客姓名等,出发日期的格式:xxxx-xx-xx。
购票模块:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import wait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.Exceptions import NoSuchElementException
import json
import time
from check_ticket import Check#余票查询模块
from captcha import Code#验证码处理模块
class Buy_Ticket():
def __init__(self, start_station, end_station, date, username, password, purpose, names):
self.num = 1
self.start = start_station
self.end = end_station
self.date = date
self.username = username
self.password = password
self.purpose = purpose
self.all_names = names
self.login_url = 'https://kyfw.12306.cn/otn/resources/login.html'
self.ticket_url = 'https://kyfw.12306.cn/otn/leftTicket/init'
#模拟登录函数,包括自动填充用户名、密码、自动点击验证、最终自动登录
def login(self):
browser.get(self.login_url)
time.sleep(0.5)
try:
wait.WebDriverWait(browser, 5).until(EC.element_to_be_clickable((By.CLASS_NAME,'login-hd-account'))).click()
input_name = browser.find_element_by_id('J-userName')
input_pd = browser.find_element_by_id('J-password')
input_name.send_keys(self.username)
input_pd.send_keys(self.password)
start_end, check = self.add_cookie()
#time.sleep(5)
c = Code(browser) #调用验证码识别模块
c.main()
print('登录成功!')
time.sleep(0.8)
print(browser.current_url)
#try:
#wait.WebDriverWait(browser, 3).until(EC.element_to_be_clickable((By.CLASS_NAME,'btn.btn-primary.ok'))).click()
#except NoSuchElementException:
#pass
self.check(start_end, check)
except NoSuchElementException:
print('没有找到元素')
self.login()
def add_cookie(self):
check = Check(self.date, self.start, self.end, self.purpose)
start_end = check.look_up_station()
# cookie的添加,json.dumps把以汉字形式呈现的起始、终点站转化成unicode编码,可在审查元素里查看cookie
browser.add_cookie({'name': '_jc_save_fromStation',
'value': json.dumps(self.start).strip('"').replace('\\', '%') + '%2C' + start_end[0]})
browser.add_cookie({'name': '_jc_save_toStation',
'value': json.dumps(self.end).strip('"').replace('\\', '%') + '%2C' + start_end[1]})
browser.add_cookie({'name': '_jc_save_fromDate', 'value': self.date})
print('cookie添加完成!')
return start_end, check
#余票查询函数,获取预定车次信息
def check(self, start_end, check):
#调用余票查询模块
browser.get(self.ticket_url)
self.num = check.get_info(start_end, 1)
button = wait.WebDriverWait(browser, 3).until(EC.element_to_be_clickable((By.ID,'query_ticket')))
button.click()
if self.purpose == '学生':
browser.find_element_by_id('sf2').click()
button.click()
def check_date(self):
#12306学生票的时间是:暑假:6月1日-9月30日,寒假:12月1日-3月31日
date = ''.join(self.date.split('-'))
#暑假
if int(date[:4] + '0601') <= int(date) <= int(date[:4] + '0930'):
return 1
#当年寒假,也就是当年的1、2、3月
if int(date[:4] + '0101') <= int(date) <= int(date[:4] + '0331'):
return 1
#这里处理的是从当年12月到第二年的3月,比如在2020-12-12买2021-01-18的学生票,那么就是在下面的处理区间
next_year = str(int(date[:4]) + 1)
if int(date[:4] + '1201') <= int(date) <= int(next_year + '0331'):
return 1
return 0
#车票预定函数
def book_ticket(self):
time.sleep(1.5)
print('开始预订车票...')
#先查找出所有车次对应的预订按钮,再根据余票查询模块返回的车次序号,点击相应的预订按钮
button = browser.find_elements_by_class_name('no-br')
button[self.num-1].click()
time.sleep(1.5)
#选择乘车人
#获取所有乘车人的信息
passengers = browser.find_element_by_id('normal_passenger_id')
names = passengers.text.split('\n')
for name in self.all_names:
index = names.index(name)
browser.find_element_by_id('normalPassenger_' + str(index)).click()
if '学生' in name:
if self.check_date():
browser.find_element_by_id('dialog_xsertcj_ok').click()
else:
print('当前日期不在学生票可购买时间区间!')
print('学生票乘车时间为暑假6月1日至9月30日、寒假12月1日至3月31日!')
browser.find_element_by_id('dialog_xsertcj_cancel').click()
#browser.close()
browser.find_element_by_id('submitOrder_id').click()
wait.WebDriverWait(browser, 3).until(EC.element_to_be_clickable((By.ID,'qr_submit_id'))).click()
print('车票预定成功!请在30分钟内完成付款!')
def main(self):
self.login()
if self.num:
self.book_ticket()
else:
browser.close()
return
if __name__ == '__main__':
begin = time.time()
#隐藏浏览器
options = webdriver.ChromeOptions()
options.add_argument("--disable-blink-features=AutomationControlled") #2020.03.28更新
browser = webdriver.Chrome(options=options)
browser.maximize_window()
# Buy_Ticket类初始化参数,从左到右:出发站,终点站,出发日期,账号,密码,购票类型(默认购买成人票,若要购买学生票,
# 添加乘客姓名时在后面加上(学生)),把要购买票的乘客姓名放在一个列表里
b = Buy_Ticket('上海', '重庆', '2020-12-30', '账号', '密码', 'ADULT', ['乘客1姓名', '乘客2姓名(学生)'])
b.main()
end = time.time()
print('总耗时:%d秒' % int(end-begin))
#browser.close()
验证码处理模块:
import time
import base64
import requests
import numpy as np
from selenium import webdriver
from selenium.webdriver import ActionChains
from selenium.webdriver.support import wait
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
class Code():
def __init__(self, browser):
self.browser = browser
self.verify_url = 'http://littlebigluo.qicp.net:47720/' #验证码识别网址,返回识别结果
#获取验证码图片
def get_captcha(self):
element = self.browser.find_element_by_class_name('imgCode')
#time.sleep(0.5)
img = base64.b64decode(element.get_attribute('src')[len('data:image/jpg;base64,'):])
with open('captcha.png', 'wb') as f:
f.write(img)
#验证码解析
def parse_img(self):
pic_name = 'captcha.png'
# 打开保存到本地的验证码图片
files={'pic_xxfile':(pic_name,open(pic_name,'rb'),'image/png')}
response = requests.post(self.verify_url, files=files)
try:
num = response.text.split('<B>')[1].split('<')[0]
except IndexError: #验证码没识别出来的情况
print('验证码未能识别!重新识别验证码...')
return
try:
if int(num):
print('验证码识别成功!图片位置:%s' % num)
return [int(num)]
except ValueError:
try:
num = list(map(int,num.split()))
print('验证码识别成功!图片位置:%s' % num)
return num
except ValueError:
print('验证码未能识别')
return
#识别结果num都以列表形式返回,方便后续验证码的点击
#还有可能验证码没能识别出来
#实现验证码自动点击
def move(self):
num = self.parse_img()
if num:
try:
element = self.browser.find_element_by_class_name('loginImg')
for i in num:
if i <= 4:
ActionChains(self.browser).move_to_element_with_offset(element,40+72*(i-1),73).click().perform()
else :
i -= 4
ActionChains(self.browser).move_to_element_with_offset(element,40+72*(i-1),145).click().perform()
self.browser.find_element_by_class_name('login-btn').click()
self.slider()
except:
print('元素不可选!')
else:
self.browser.find_element_by_class_name('lgcode-refresh').click() #刷新验证码
time.sleep(1.5)
self.main()
def ease_out_quart(self, x):
return 1 - pow(1 - x, 4)
#生成滑动轨迹
def get_tracks(self, distance, seconds, ease_func):
tracks = [0]
offsets = [0]
for t in np.arange(0.0, seconds, 0.1):
ease = ease_func
offset = round(ease(t / seconds) * distance)
tracks.append(offset - offsets[-1])
offsets.append(offset)
return tracks
#处理滑动验证码
def slider(self):
print('开始处理滑动验证码...')
#track = self.get_tracks(305, 1, self.ease_out_quart)
track = [30, 50, 90, 140] #滑动轨迹可随意,只要距离大于300
try:
slider = wait.WebDriverWait(self.browser, 5).until(
EC.presence_of_element_located((By.CLASS_NAME, 'nc_iconfont.btn_slide')))
ActionChains(self.browser).click_and_hold(slider).perform()
for i in track:
ActionChains(self.browser).move_by_offset(xoffset=i, yoffset=0).perform()
except:
print('验证码识别错误!等待验证码刷新,重新识别验证码...')
time.sleep(2.1) #验证码刷新需要2秒
self.main()
def main(self):
self.get_captcha()
self.move()
余票查询模块:
import requests
from urllib.parse import urlencode
import time
import json
class Check():
def __init__(self, date, start, end, purpose):
self.base_url = 'https://kyfw.12306.cn/otn/leftTicket/query?'
self.url = 'https://kyfw.12306.cn/otn/resources/js/framework/station_name.js?station_version=1.9018'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36',
'Cookie': 'JSESSIONID=B709F9775E72BDED99B2EEBB8CA7FBB9; BIGipServerotn=1910046986.24610.0000; RAIL_EXPIRATION=1579188884851; RAIL_DEVIC'
}
self.date = date
self.start_station = start
self.end_station = end
self.purpose = purpose if purpose == 'ADULT' else '0X00'
#查找出车站的英文简称,用于构造cookie、完整的余票查询链接
def look_up_station(self):
try:
with open('station_code.json', 'r') as f:
dic = json.load(f)
except FileNotFoundError:
response = requests.get(self.url).text
station = response.split('@')[1:]
dic = {}
for each in station:
i = each.split('|')
dic[i[1]] = i[2]
with open('station_code.json', 'w') as f:
f.write(json.dumps(dic))
return [dic[self.start_station], dic[self.end_station]]
def get_info(self, start_end, check_count):
#构造请求参数
data = {
'leftTicketDTO.train_date': self.date,
'leftTicketDTO.from_station': start_end[0],
'leftTicketDTO.to_station': start_end[1],
'purpose_codes': self.purpose
}
url = self.base_url + urlencode(data)
print('完整余票查询链接:', url)
count = 0 # 用于对车次编号
while count == 0:
print('余票查询中... %d次' % check_count)
response = requests.get(url, headers=self.headers)
#print(response.text)
try:
json = response.json()
except ValueError:
print('余票查询链接有误,请仔细检查!')
return
maps = json['data']['map']
for each in json['data']['result']:
count += 1
s = each.split('|')[3:]
info = {
'train':s[0],
'start_end':maps[s[3]] + '-' + maps[s[4]],
'time':s[5] + '-' + s[6],
'历时':s[7],
'一等座':s[28],
'二等座':s[27]
}
try:
#余票的结果有3种:有、一个具体的数字(如:18、6等)、无,判断如果余票是有或者一个具体的数字就直接输出对应的车次信息,然后返回
if info['二等座'] == '有' or int(info['二等座']):
print('预定车次信息如下:')
print('[%d]' % count, info)
return count
except ValueError:
continue
count = 0
check_count += 1
time.sleep(0.8)
测试结果:
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。