被春运逼的自己写个脚本监控车票

春运

虽然过年只有7天假,可是家还是要回的。
本来用的是微信和智行抢票,折腾好久还没买到,就试试看手动抢吧,不是大佬不会自动登陆,只能监控一下有没有票弹出浏览器人肉抢票了。
记得很多大佬做过查询火车票的工具,我都没看过,自己从0开始做的。用了两个第三方包,requestsprettytable
还有一个station_names是获取车站代码到车站中文的函数。都在压缩包里了。
easy_ticket.zip

尝试开发

我试过手动登陆之后获取cookies放脚本里,可是很快就过期了,12306的验证码也根本是人都看不懂的,不知道大佬们有没有自动登陆的好办法,如果有一定请教一下。
另外json接口里面有很多不知道是什么意思,不清楚意义我都写成了xcode

windows下面执行页面很小,需要改cmd属性里面的窗口大小。

效果

prettytable这个包的效果还是很不错的,设置title,输入数据,设置排序,就完成了,非常简单好用的一个格式化输出库。
微信截图_20180125170320.png

代码

#-*- coding:utf-8 -*-
import requests
import time
import re
import os
import station
from prettytable import PrettyTable

class Ticket:
    def __init__(self, station):
        #搜索条件列表
        self.config = []
        self.search_url = ''
        #查询间隔
        self.sleep_sec = 10
        self.station = station
        #创建会话对象
        self.s = requests.Session()
        #超时时间
        self.s.timeout = 5

        #设置表头
        self.table = PrettyTable([
            '状态',
            '车次',
            '出发站',
            '到达站',
            '出发时间',
            '到达时间',
            '历时',
            '有票',
            '无座',
            '硬座',
            '硬卧',
            '软卧',
            '一等座',
            '二等座',
            '商务座',
        ])
    def main(self):
        count = 0
        while True:
            count += 1
            print('第{}次查询,{}秒后再次查询'.format(count, self.sleep_sec))
            self.main_loop()
            print()
            #延时重启
            time.sleep(self.sleep_sec)

    def addsearch(self, form, preg_filter, time_filter):
        self.config.append({
            #查询参数配置
            'form':form,
            #过滤
            'filter':preg_filter,
            #时间过滤
            'time_filter':time_filter
        })

    def generate_url(self, form):
        if self.search_url == '':
            self.search_url = "https://kyfw.12306.cn/otn/leftTicket/query?leftTicketDTO.train_date={}&leftTicketDTO.from_station={}&leftTicketDTO.to_station={}&purpose_codes={}".format(
                form['train_date'],
                station.name2id(form['from_station']),
                station.name2id(form['to_station']),
                form['purpose_codes']
            )
        #print(url)
        return self.search_url
    def main_loop(self):
        #遍历所有配置项
        for config_item in self.config:
            self.table.clear_rows()

            response = self.s.get(self.generate_url(config_item['form']))

            if response.status_code != 200:
                print('网络错误,status_code:{}'.format(response.status_code))
                continue
            if response.text.find('{"data":{"flag"') != 0:
                print('页面错误.')
                continue
            try:
                response = response.json().get('data').get('result')
            except:
                with open('./json.log','w') as file:
                    file.write(response.text)
                exit()
            for item in self.format_train(response):
                if self.filter_train(config_item, item):
                    self.table.add_row([
                        item['state'],
                        item['train_id'],
                        item['search_start'],
                        item['search_stop'],
                        item['time_in'],
                        item['time_out'],
                        item['time_travel'],
                        '有票' if item['have_ticket']=='Y' else '没票',
                        item['无座'],
                        item['硬座'],
                        item['硬卧'],
                        item['软卧'],
                        item['一等座'],
                        item['二等座'],
                        item['商务座'],
                    ])
                    
                    if self.train_buy_check(config_item, item):
                        print(self.table)
                        exit()
            self.table.reversesort = False
            self.table.sortby = '出发时间'
            print(self.table)
    #买票过滤
    def train_buy_check(self, config, item):
        #如果可购买,打开浏览器提醒买票
        if item['have_ticket'] != 'Y':
            return
        flag = False
        for col_name in config['filter']:
            if isinstance(config['filter'][col_name],int) and config['filter'][col_name]==1:
                if item[col_name] != '无' and item[col_name] != '--' and item[col_name] != '':
                    note = '{}/{}/{}/{}'.format(
                            config['form']['train_date'],
                            item['search_start'],
                            item['search_stop'],
                            item['train_id']
                        )
                    print(note)
                    os.system('chrome https://kyfw.12306.cn/otn/leftTicket/init?notice=' + note)
                    return True
        return False
    #展示过滤
    def filter_train(self, config, item):
        #遍历正则过滤
        for col_name in config['filter']:
            if isinstance(config['filter'][col_name],list) and config['filter'][col_name] != []:
                #任何一项匹配
                flag = False
                for preg in config['filter'][col_name]:
                    if re.match(preg, item[col_name]):
                        flag = True
                        break
                if not flag:
                    return False

        #发车时间过滤
        for time_item in config['time_filter']:
            if config['time_filter'][time_item]:
                tmp = time_item.split('-')
                if item['time_in'] < tmp[0] or item['time_in'] > tmp[1]:
                    return False
        return True

    def format_train(self, train_list):
        for train in train_list:
            info = train.split('|')
            yield {
                'xcode1':info[0],
                'state':info[1], # 状态 预定
                'train_no':info[2],
                'train_id':info[3], # 车次 T3037
                'train_start':self.station.id2name(info[4]), # 始发站
                'train_stop':self.station.id2name(info[5]), # 终点站
                'search_start':self.station.id2name(info[6]), # 购买起点站
                'search_stop':self.station.id2name(info[7]), # 购买到达站
                'time_in':info[8], # 出发时间
                'time_out':info[9], # 到达时间
                'time_travel':info[10], # 历经时间
                'have_ticket':info[11], # 是否可购买 Y
                'xcode3':info[12], # 
                'train_begin_date':info[13], # 车次发车日期
                'xcode4':info[14], # 
                'xcode5':info[15], # 
                'train_sort_begin':info[16], # 站序排列起始
                'train_sort_end':info[17], # 站序排列结束
                'xcode6':info[18], # 
                'xcode7':info[19], # 
                'xcode8':info[20], # 
                'xcode9':info[21], # 
                'xcode10':info[22], # 
                '软卧':info[23], # 软卧数量
                'xcode11':info[24], # 
                'xcode12':info[25], # 
                '无座':info[26], # 无座数量
                'xcode13':info[27], # 
                '硬卧':info[28], # 硬卧数量
                '硬座':info[29], # 硬座数量
                '二等座':info[30], # 二等座
                '一等座':info[31], # 一等座
                '商务座':info[32], # 商务座
                'xcode14':info[33], # 
                'xcode15':info[34], # 
                'xcode16':info[35], # 
                'exchange':info[36] # 可兑换 [1,0]
            }

if __name__ == '__main__':
    station = station.Station()
    ticket = Ticket(station)
    #回家车票
    #
    ticket.addsearch({
        'train_date':'2018-06-15',
        'from_station':'北京西',
        'to_station':'邢台',
        'purpose_codes':'ADULT'
    },{
        #'state':[],
        #'train_id':[],
        #'have_ticket':['Y','N'],
        '无座':0,
        '硬座':0,
        '硬卧':0,
        '软卧':0,
        '一等座':0,
        '二等座':0,
        '商务座':0,
    },{
        '00:00-23:59':0,
        '00:00-06:00':0,
        '06:00-12:00':0,
        '12:00-18:00':0,
        '18:00-23:59':1,
    })
    #回北京车票
    # ticket.addsearch({
        # 'train_date':'2018-02-21',
        # 'from_station':'西安',
        # 'to_station':'北京',
        # 'purpose_codes':'ADULT'
    # },{
        # 'search_start':[],
        # 'state':[],
        # 'train_id':['^(G|K).*'],
        # 'time_in':[],
        # 'time_out':['[^0]{2}:\d\d'],
        # 'have_ticket':['Y','N'],
        # 'wuzuo':[],
        # 'yingzuo':[],
        # '2dengzuo':[],
    # },{
        # 'time_in_min':'09:55',
        # 'time_in_max':'19:19',
        # 'time_out_max':'21:00'
    # })
    ticket.main()


2018.2.11更新
有大佬给了我测试框架自动抢票的代码,我改了改

from splinter.browser import Browser
from time import sleep
import traceback
import os

class Buy_Tickets(object):
    # 定义实例属性,初始化
    def __init__(self, username, passwd, order, passengers, dtime, starts, ends):
        self.username = username
        self.passwd = passwd
        # 车次,0代表所有车次,依次从上到下,1代表所有车次,依次类推
        self.order = order
        # 乘客名
        self.passengers = passengers
        # 起始地和终点
        self.starts = starts
        self.ends = ends
        # 日期
        self.dtime = dtime
        # self.xb = xb
        # self.pz = pz
        self.login_url = 'https://kyfw.12306.cn/otn/login/init'
        self.initMy_url = 'https://kyfw.12306.cn/otn/index/initMy12306'
        self.ticket_url = 'https://kyfw.12306.cn/otn/leftTicket/init'
        self.driver_name = 'chrome'
        self.executable_path = 'C:\python3\Scripts\chromedriver_win32\chromedriver.exe'
    # 登录功能实现
    def login(self):
        self.driver.visit(self.login_url)
        sleep(5)
        while True:
            if self.driver.url != self.login_url:
                self.driver.visit(self.login_url)
                sleep(2)
            else:
                break
        self.driver.fill('loginUserDTO.user_name', self.username)
        # sleep(1)
        self.driver.fill('userDTO.password', self.passwd)
        # sleep(1)
        print('请输入验证码...')
        while True:
            if self.driver.url == self.ticket_url:
                sleep(2)
                break
            else:
                sleep(2)
                
    # 买票功能实现
    def start_buy(self):
        self.driver = Browser(driver_name=self.driver_name, executable_path=self.executable_path)
        #窗口大小的操作
        self.driver.driver.set_window_size(800, 600)
        self.login()
        print('login done')
        # self.driver.visit(self.ticket_url)
        # exit()
        try:
            print('开始购票...')
            # 加载查询信息
            self.driver.cookies.add({"_jc_save_fromStation": self.starts})
            self.driver.cookies.add({"_jc_save_toStation": self.ends})
            self.driver.cookies.add({"_jc_save_fromDate": self.dtime})
            self.driver.reload()
            print('reload done')
            sleep(1)
            count = 0
            #!!!!!!!!!!!!!!!!!!!
            #设置车次类型
            #123456分别对应 GC-高铁/城际 D-动车 Z-直达 T-特快 K-快速 其他
            #self.driver.find_by_xpath('//ul[@id="_ul_station_train_code"]/li[1]/input')[0].click()
            #!!!!!!!!!!!!!!!!!!!
            while self.driver.url == self.ticket_url:
                self.driver.find_by_text('查询').click()
                count += 1
                print('第%d次点击查询...' % count)
                try:
                    train_list = [x.text for x in self.driver.find_by_xpath('//tbody[@id="queryLeftTable"]/tr/td/div/div/div/a')]
                    for item in train_list:
                        #是否在目标列表
                        if item in self.order:
                            print(item,end=',')
                            #是否有指定票 二等座
                            if self.driver.find_by_xpath('//tbody[@id="queryLeftTable"]/tr')[train_list.index(item)].find_by_xpath('td[4]').text[0] not in ('无','--'):
                                self.driver.find_by_text('预订')[train_list.index(item)].click()
                                sleep(0.05)
                                #是否进入成功预定 跳出
                                if self.driver.url == self.ticket_url:
                                    sleep(1)
                                    break
                        else:
                            continue

                except Exception as e:
                    print(e)
                    print('预订失败...')
                    continue
            print('开始预订...')
            sleep(1)
            print('开始选择用户...')
            for p in self.passengers:
                self.driver.find_by_text(p).last.click()
                sleep(0.5)
                if p[-1] == ')':
                    self.driver.find_by_id('dialog_xsertcj_ok').click()
            print('提交订单...')
            print('测试停止')
            # sleep(1)
            # self.driver.find_by_text(self.pz).click()
            # sleep(1)
            # self.driver.find_by_text(self.xb).click()
            # sleep(1)
            self.driver.find_by_id('submitOrder_id').click()
            sleep(2)
            print('确认选座...')
            self.driver.find_by_id('qr_submit_id').click()
            print('预订成功...')
        except Exception as e:
            print(e)


if __name__ == '__main__':
    # 用户名
    username = 'username'
    # 密码
    password = 'password'
    # 车次选择,0代表所有车次
    order = [
        'G65','G429','G659','G67','G491','G6733'
    ]
    # 乘客名,比如passengers = ['丁小红', '丁小明']
    # 学生票需注明,注明方式为:passengers = ['丁小红(学生)', '丁小明']
    passengers = ['隔壁老王']
    # 日期,格式为:'2018-01-20'
    dtime = '2018-02-14'
    # 出发地(需填写cookie值)
    starts = '%u5317%u4EAC%2CBJP' #吴堡
    # 目的地(需填写cookie值)
    ends = '%u90A2%u53F0%2CXTP' #西安
    Buy_Tickets(username, password, order, passengers, dtime, starts, ends).start_buy()
最后修改:2019 年 04 月 22 日 03 : 58 PM

11 条评论

  1. Loekman

    想知道大师们是咋知道接口的

    1. moozik
      @Loekman

      浏览器F12就都看到了

  2. 学习笔记Blog

    这个好牛呀!

  3. True

    厉害了。。。。

    1. moozik
      @True

      刚发现真名忘了改了,赶紧改没了OωO

  4. liaozixu

    向python大佬低头

    1. moozik
      @liaozixu

      一点点

  5. xema

    心疼,不得不赶这波热潮

    1. moozik
      @xema

      是啊,然而现在我还没买到回家的票,也许要搭车回去了

  6. c0smxsec

    帝都人去河北工作,很强φ( ̄∇ ̄o)

    1. moozik
      @c0smxsec

      你说反了大佬

发表评论