python多线程爬取globalwindatlas风速风向数据

python多线程爬取globalwindatlas风速风向数据

Fre_soe 467 2022-11-07

集合单线程, 多线程, 单点多点数据爬取方法,
使用本地代理池, 监控线程状态, 中断则自启, 监控数据爬取状态
比较乱, 自取


'''
@File    :   GetWindSpeed.py
@Time    :   2021/11/01 10:46:55
@Author  :   JuYongkang 
@Version :   1.0
@Contact :   j_juyongkang@163.com
@Desc    :   None
'''

# here put the import lib
from math import radians, cos, sin, asin, sqrt, pi, tan, atan2, degrees
from fake_useragent import UserAgent
import requests
import json
import random
import threading
from threading import Lock, Thread
import time, os
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)




def geodistance(lng1,lat1,lng2,lat2):

    """计算两经纬点之间的距离"""

    #lng1,lat1,lng2,lat2 = (120.12802999999997,30.28708,115.86572000000001,28.7427)
    lng1, lat1, lng2, lat2 = map(radians, [float(lng1), float(lat1), float(lng2), float(lat2)]) # 经纬度转换成弧度
    dlon=lng2-lng1
    dlat=lat2-lat1
    print('----')
    print('dlon----', dlon)
    print('dlat----', dlat)
    a=sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    print('a--------', a)
    distance=2*asin(sqrt(a))*6371*1000 # 地球平均半径,6371km
    distance=round(distance/1000,3)
    return distance


def Get_Coordinates(lon1, lat1, brng, d):

    """
    已知一个点经纬度,根据方位角和距离求另一经纬点
    :param lon1: 
    :param lat1: 
    :param brng: 方位角(需转化为弧度)
    :param d: 距离(千米,公里)
    """

    R = 6371 #Radius of the Earth
    # brng = 1.57 #Bearing is 90 degrees converted to radians.
    brng = brng #Bearing is 90 degrees converted to radians.
    d = d #Distance in km

    #lat2  52.20444 - the lat result I'm hoping for
    #lon2  0.36056 - the long result I'm hoping for.

    # lon1 = radians(72.053564) #Current long point converted to radians

    lon1 = radians(lon1) #Current long point converted to radians

    # lat1 = radians(52.649776) #Current lat point converted to radians

    lat1 = radians(lat1) #Current lat point converted to radians

    lat2 = asin( sin(lat1)*cos(d/R) +
        cos(lat1)*sin(d/R)*cos(brng))

    lon2 = lon1 + atan2(sin(brng)*sin(d/R)*cos(lat1),
                cos(d/R)-sin(lat1)*sin(lat2))

    lon2 = round(degrees(lon2), 6) 
    lat2 = round(degrees(lat2), 6) 

    # print('%.6f' % lon2)
    # print('%.6f' % lat2)
    return lon2, lat2

 

def Get_Parameter(lng1, lat1):
    """获取点位外围三公里点位坐标"""
    angle = 225
    coordinate_lists = []
    for i in range(5):

        if angle > 360:
            angle = 45

        lists1 = []

        brng = radians(angle)
        d = 1.5 / sin(radians(45))
        # d = 1.5
        lon2, lat2 = Get_Coordinates(lng1, lat1, brng, d)
        # res_lon, res_lat = test(lon2, lat2, radians(360), 1.5)
        lists1.append(lon2)
        lists1.append(lat2)
        coordinate_lists.append(lists1)
        angle += 90

    return coordinate_lists


def Send_Request(lng1, lat1, url, payload):
    """发送请求"""
    ua = UserAgent()
    headers = {
        'Content-Type': 'application/json',
        'User-Agent': ua.random
        }
    # print(headers)
    print('请求点位-------------------', lng1, lat1)
    data = json.dumps(payload)
    # print(data)
    s = requests.Session()
    s.mount('http://', HTTPAdapter(max_retries=3))
    s.mount('https://', HTTPAdapter(max_retries=3))
    try:
        resp = s.post(url, headers=headers, data=data, timeout=20)
        # resp = requests.post(url, headers=headers, data=data)
        res = json.loads(resp.text)
    except requests.exceptions.RequestException as e:
        print(e)
    # print(res)
    data = res['area_means']
    wind_speed = data[0]['val']
    height = payload['height']
    print('风速----------------', wind_speed)
    with open('C:/Users/JK/Documents/Projects/Test/numpys/data.txt', 'a+') as f:
        f.write(str(lng1) + ' ' + str(lat1) + ' ' + str(height) + 'm' + ' ' + str(wind_speed) + 'm/s' + '\n')
    print('-' * 20)



def main(lng1, lat1, lng2, lat2, d, url):

    """
    跨度500m
    :param lng1 lat1: 起始经纬度(左下角点位)
    :param lng2 lat2: 终止经纬度(右上角点位)
    :param d: 跨度
    :param url: 请求地址
    :return:
    """
    lng, lat = lng1, lat1
    lis1 = []
    lis_point = []
    while lat <= lat2:
        while lng <= lng2:
            # 取中心点周围四点
            coordinate = Get_Parameter(lng, lat)
            # print('coordinate----', coordinate)
            payload = {}
            lis_fourpoint = []
            lis_fourpoint.append(coordinate)
            # print('lis--------', lis)
            payload['height'] = 100
            payload['coord'] = lis_fourpoint
            Send_Request(lng, lat, url, payload)
            lis2 = []
            lis2.append(lng)
            lis2.append(lat)
            lis_point.append(lis2)
            lng_next, lat_next = Get_Coordinates(lng, lat, radians(90), 0.5)     
            # print('lng_next----------', lng_next)
            lng, lat = lng_next, lat_next
        lng = lng1
        lng_next, lat_next = Get_Coordinates(lng, lat, radians(0), 0.5)
        lng, lat = lng_next, lat_next
    print('结束')
    print(lis_point)
    print('一共请求 %s 个点位' % len(lis_point))


def Get_CenterPoint(lng1, lat1, lng2, lat2, d):
    """获取区域内中心点"""
    lng, lat = lng1, lat1
    lis1 = []
    lis_point = []
    while lat <= lat2:
        while lng <= lng2:
            lis2 = []
            lis2.append(lng)
            lis2.append(lat)
            lis_point.append(lis2)
            lng_next, lat_next = Get_Coordinates(lng, lat, radians(90), 0.5)
            lng, lat = lng_next, lat_next
        lng = lng1
        lng_next, lat_next = Get_Coordinates(lng, lat, radians(0), 0.5)
        lng, lat = lng_next, lat_next
    print('区域内一共取到{}个点位'.format(len(lis_point)))
    print(lis_point)
    return lis_point


def test(lng, lat):
    """单点测试"""
    coordinate = Get_Parameter(lng, lat)
    # print('coordinate----', coordinate)
    payload = {}
    lis_fourpoint = []
    lis_fourpoint.append(coordinate)
    print('lis--------', lis_fourpoint)
    # payload['height'] = 100
    payload['height'] = 200
    payload['coord'] = lis_fourpoint
    print('payload-----------', payload)
    data = json.dumps(payload)

    # Send_Request(lng, lat, url, payload)
    ua = UserAgent()
    headers = {
            'Content-Type': 'application/json',
            'User-Agent': ua.random
            }
    proxy = {'http': 'http://' + '59.63.74.97:3256'}
    s = requests.Session()
    s.mount('http://', HTTPAdapter(max_retries=3))
    s.mount('https://', HTTPAdapter(max_retries=3))
    s.keep_alive = False
    resp = s.post(url, headers=headers, proxies=proxy, verify=False, data=data, timeout=50)

    res = json.loads(resp.text)
    datas = res['rose']
    rose_str = ''
    for data in datas:
        val = data['value']
        rose_str = rose_str + ' ' + str(val)
    ss = str(lng) + ' ' + str(lat) + rose_str
    print('结果:')
    print(ss)


def Get_Proxy():
    """获取ip"""
    pro = requests.get('http://localhost:5555/random')
    prox = pro.text
    proxy = {'http': 'http://' + prox}
    return proxy



def work(point_list, url):
    """多线程获取点位风速"""
    global proxies, base_dir
    # for point in point_list:
    for i in range(len(point_list)):
        ll = len(point_list)
        if ll == 0:
            print('点位已被取完------------------------')
            # global listen
            # listen = False
            return
        lock.acquire()
        point = point_list.pop(0)
        lock.release()
        lng, lat = point[0], point[1]
        # print('point--------------', point)
        coordinate = Get_Parameter(lng, lat)
        payload = {}
        lis_fourpoint = []
        lis_fourpoint.append(coordinate)
        payload['height'] = 200
        payload['coord'] = lis_fourpoint

        ua = UserAgent()
        headers = {
            'Content-Type': 'application/json',
            'User-Agent': ua.random
            }
        # 获取代理ip
        proxy_ip = random.choice(proxies)
        proxy = {'http': 'http://' + proxy_ip}
        # proxy = Get_Proxy()

        data = json.dumps(payload)
        # print(data)
        s = requests.Session()
        s.mount('http://', HTTPAdapter(max_retries=3))
        s.mount('https://', HTTPAdapter(max_retries=3))
        s.keep_alive = False
        # resp = s.post(url, headers=headers, proxies=proxy, verify=False, data=data, timeout=50)
        try:
            resp = s.post(url, headers=headers, proxies=proxy, verify=False, data=data, timeout=50)
            res = json.loads(resp.text)
            print(resp.text)
            if 'status' in res:
                # print('ip被限制,请求失败,{}点位未获取到area_means---------'.format(point))
                lock.acquire()
                with open(base_dir + '/log.txt', 'a+') as w:
                    w.write('请求失败,{}点位未获取到rose_value---------\n'.format(point))
                    w.write('{}点位已重新放入\n'.format(point))
                point_list.append(point)
                lock.release()
                resp.close()
                time.sleep(5)
                return
            # data = res['area_means']
            # wind_speed = data[0]['val']
            # height = payload['height']
            datas = res['rose']
            rose_str = ''
            for data in datas:
                val = data['value']
                rose_str = rose_str + ',' + str(val)

        except requests.exceptions.RequestException as e:
            print(e)
            print('point点位为---------', point)
            lock.acquire()
            with open(base_dir + '/log.txt', 'a+') as w:
                w.write('{}点位请求异常------------------\n'.format(point))
                w.write('{}点位已被重新放入\n'.format(point))
            point_list.append(point)
            lock.release()
            # print('{}点位已被重新放入'.format(point))
            time.sleep(5)
            return
        lock.acquire()
        with open(base_dir + '/windFrequencyRose_200_2.txt', 'a+') as f:
            # f.write(str(lng) + ' ' + str(lat) + ' ' + str(height) + 'm' + ' ' + str(wind_speed) + 'm/s' + '\n')
            f.write(str(lng) + ',' + str(lat) + rose_str + '\n')
        with open(base_dir + '/log.txt', 'a+') as l:
            l.write('{},{}点位已写入, 风向为{}\n'.format(lng, lat, rose_str))
        lock.release()
        # print('-' * 20)
        # print('{},{}点位已写入,风向为{}'.format(lng, lat, rose_str))
        # print('剩余点位:', (ll-1))
        # print('-' * 20)
        resp.close()
        time.sleep(5)



def work1(point_list, url, i):
    """线程测试"""
    # print('{}线程开始工作'.format(i))
    time.sleep(1)
    for n in range(len(point_list)):
        lock.acquire()
        ss = point_list.pop(0)
        ll = len(point_list)
        lock.release()
        time.sleep(2)
        print('{}线程取出的元素为----------------------------------------------{}'.format(i, ss))
        print('列表剩余点位: ', ll)
        return


def check(init_thread_name=[]):
    """监听函数"""
    print('监听线程开启----------')
    global listen
    num = 3
    while listen:
        now_thread_name = []
        now = threading.enumerate()
        for i in now:
            now_thread_name.append(i.getName())
        print('正在运行的线程有+++++++++++', now_thread_name)
        for ip in init_thread_name:
            if ip in now_thread_name:
                # print(ip + '线程正在运行')
                pass
            else:
                with open(base_dir + '/listen.txt', 'a+') as li:
                    li.write(ip + '线程已挂掉!!!!!!!!!!!!\n')
                # print(ip + '线程已挂掉!!!!!!!!!!!!!!!!!')
                global lis_point
                if len(lis_point) != 0:
                    with open(base_dir + '/listen.txt', 'a+') as li:
                        li.write(ip + '线程重启中------------\n')
                    # print('=' * 30)
                    # print(ip + '线程重启中!!!!!!!!!!!!!!!!!!!!!')
                    # print('=' * 30)
                    t = threading.Thread(target=work, args=(lis_point, url), name=ip)
                    t.start()
                else:
                    if num == 0:
                        listen = False
                        break
                    else:
                        num -= 1
                        continue
        time.sleep(20)
    else:
        with open(base_dir + '/listen.txt', 'a+') as li:
            li.write('点位取完,监听退出\n')
        # print('点位取完,监听退出===========')



if __name__ == "__main__":

    global proxies, base_dir
    base_dir = os.path.dirname(__file__)
    # lng1 = 109.951172
    # lat1 = 35.995785
    # # lng1 = 114.873047
    # # lat1 = 32.175612
    # d = 15 / sin(radians(45))
    # lng2, lat2 = Get_Coordinates(lng1, lat1, radians(45), d)
    # print(lng2, lat2)
    # d1 = 0.5

    # url = 'https://globalwindatlas.info/api/gwa/custom/windSpeed'
    url = 'https://globalwindatlas.info/api/gwa/custom/windFrequencyRose'


    # -----------------------------------------------------------------------------
    # 单程序启动入口
    # main(lng1, lat1, lng2, lat2, d, url)
    # -----------------------------------------------------------------------------


    # -----------------------------------------------------------------------------
    # 获取区域所有点位合集(计算)
    # global lis_point
    # lis_point = Get_CenterPoint(lng1, lat1, lng2, lat2, d1)
    # -----------------------------------------------------------------------------


    # -----------------------------------------------------------------------------
    # 获取区域所有点位合集(文件导入)
    global lis_point
    lis_point = []
    with open(base_dir + '/points_v1.txt', 'r') as r:
        points = r.readlines()
    for point in points:
        lng= float(point.split(',')[0])
        lat = float(point.split(',')[1].split('\n')[0])
        temp = []
        temp.append(lng)
        temp.append(lat)
        lis_point.append(temp)
    # print('所有区域点位合集-----', lis_point)
    # -----------------------------------------------------------------------------

    # -----------------------------------------------------------------------------
    # 获取单点风速
    # lng_test, lat_test = 122.64125,36.53125
    # test(lng_test, lat_test)
    # -----------------------------------------------------------------------------
    

    # # -----------------------------------------------------------------------------
    # 多线程入口
    start_time = time.time()
    lock = threading.Lock()
    l = []
    init_thread_name = []
    proxies = []
    with open(base_dir + '/proxy.txt', 'r') as r:
        prxs = r.readlines()
    for prx in prxs:
        proxies.append(prx.split('\n')[0])
        # print(proxies)
    for i in range(5):
        t = threading.Thread(target=work, args=(lis_point, url), name='thread' + str(i))
        # t = threading.Thread(target=work1, args=(lis_point, url, i), name='thread' + str(i))
        l.append(t)
        t.start()
    names = threading.enumerate()
    for i in names:
        init_thread_name.append(i.getName())
    print('线程列表+++++++', init_thread_name)
    time.sleep(10)
    global listen
    listen = True
    check(init_thread_name)
    end_time = time.time()
    print('结束++++++++++++++')
    with open(base_dir + '/log.txt', 'a+') as w:
        w.write('结束,总用时:{}'.format(end_time-start_time))
    # # -----------------------------------------------------------------------------