集合单线程, 多线程, 单点多点数据爬取方法,
使用本地代理池, 监控线程状态, 中断则自启, 监控数据爬取状态
比较乱, 自取
'''
@File : GetWindSpeed.py
@Time : 2021/11/01 10:46:55
@Author : JuYongkang
@Version : 1.0
@Contact : j_juyongkang@163.com
@Desc : None
'''
# here put the import lib
from math import radians, cos, sin, asin, sqrt, pi, tan, atan2, degrees
from fake_useragent import UserAgent
import requests
import json
import random
import threading
from threading import Lock, Thread
import time, os
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def geodistance(lng1,lat1,lng2,lat2):
"""计算两经纬点之间的距离"""
#lng1,lat1,lng2,lat2 = (120.12802999999997,30.28708,115.86572000000001,28.7427)
lng1, lat1, lng2, lat2 = map(radians, [float(lng1), float(lat1), float(lng2), float(lat2)]) # 经纬度转换成弧度
dlon=lng2-lng1
dlat=lat2-lat1
print('----')
print('dlon----', dlon)
print('dlat----', dlat)
a=sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
print('a--------', a)
distance=2*asin(sqrt(a))*6371*1000 # 地球平均半径,6371km
distance=round(distance/1000,3)
return distance
def Get_Coordinates(lon1, lat1, brng, d):
"""
已知一个点经纬度,根据方位角和距离求另一经纬点
:param lon1:
:param lat1:
:param brng: 方位角(需转化为弧度)
:param d: 距离(千米,公里)
"""
R = 6371 #Radius of the Earth
# brng = 1.57 #Bearing is 90 degrees converted to radians.
brng = brng #Bearing is 90 degrees converted to radians.
d = d #Distance in km
#lat2 52.20444 - the lat result I'm hoping for
#lon2 0.36056 - the long result I'm hoping for.
# lon1 = radians(72.053564) #Current long point converted to radians
lon1 = radians(lon1) #Current long point converted to radians
# lat1 = radians(52.649776) #Current lat point converted to radians
lat1 = radians(lat1) #Current lat point converted to radians
lat2 = asin( sin(lat1)*cos(d/R) +
cos(lat1)*sin(d/R)*cos(brng))
lon2 = lon1 + atan2(sin(brng)*sin(d/R)*cos(lat1),
cos(d/R)-sin(lat1)*sin(lat2))
lon2 = round(degrees(lon2), 6)
lat2 = round(degrees(lat2), 6)
# print('%.6f' % lon2)
# print('%.6f' % lat2)
return lon2, lat2
def Get_Parameter(lng1, lat1):
"""获取点位外围三公里点位坐标"""
angle = 225
coordinate_lists = []
for i in range(5):
if angle > 360:
angle = 45
lists1 = []
brng = radians(angle)
d = 1.5 / sin(radians(45))
# d = 1.5
lon2, lat2 = Get_Coordinates(lng1, lat1, brng, d)
# res_lon, res_lat = test(lon2, lat2, radians(360), 1.5)
lists1.append(lon2)
lists1.append(lat2)
coordinate_lists.append(lists1)
angle += 90
return coordinate_lists
def Send_Request(lng1, lat1, url, payload):
"""发送请求"""
ua = UserAgent()
headers = {
'Content-Type': 'application/json',
'User-Agent': ua.random
}
# print(headers)
print('请求点位-------------------', lng1, lat1)
data = json.dumps(payload)
# print(data)
s = requests.Session()
s.mount('http://', HTTPAdapter(max_retries=3))
s.mount('https://', HTTPAdapter(max_retries=3))
try:
resp = s.post(url, headers=headers, data=data, timeout=20)
# resp = requests.post(url, headers=headers, data=data)
res = json.loads(resp.text)
except requests.exceptions.RequestException as e:
print(e)
# print(res)
data = res['area_means']
wind_speed = data[0]['val']
height = payload['height']
print('风速----------------', wind_speed)
with open('C:/Users/JK/Documents/Projects/Test/numpys/data.txt', 'a+') as f:
f.write(str(lng1) + ' ' + str(lat1) + ' ' + str(height) + 'm' + ' ' + str(wind_speed) + 'm/s' + '\n')
print('-' * 20)
def main(lng1, lat1, lng2, lat2, d, url):
"""
跨度500m
:param lng1 lat1: 起始经纬度(左下角点位)
:param lng2 lat2: 终止经纬度(右上角点位)
:param d: 跨度
:param url: 请求地址
:return:
"""
lng, lat = lng1, lat1
lis1 = []
lis_point = []
while lat <= lat2:
while lng <= lng2:
# 取中心点周围四点
coordinate = Get_Parameter(lng, lat)
# print('coordinate----', coordinate)
payload = {}
lis_fourpoint = []
lis_fourpoint.append(coordinate)
# print('lis--------', lis)
payload['height'] = 100
payload['coord'] = lis_fourpoint
Send_Request(lng, lat, url, payload)
lis2 = []
lis2.append(lng)
lis2.append(lat)
lis_point.append(lis2)
lng_next, lat_next = Get_Coordinates(lng, lat, radians(90), 0.5)
# print('lng_next----------', lng_next)
lng, lat = lng_next, lat_next
lng = lng1
lng_next, lat_next = Get_Coordinates(lng, lat, radians(0), 0.5)
lng, lat = lng_next, lat_next
print('结束')
print(lis_point)
print('一共请求 %s 个点位' % len(lis_point))
def Get_CenterPoint(lng1, lat1, lng2, lat2, d):
"""获取区域内中心点"""
lng, lat = lng1, lat1
lis1 = []
lis_point = []
while lat <= lat2:
while lng <= lng2:
lis2 = []
lis2.append(lng)
lis2.append(lat)
lis_point.append(lis2)
lng_next, lat_next = Get_Coordinates(lng, lat, radians(90), 0.5)
lng, lat = lng_next, lat_next
lng = lng1
lng_next, lat_next = Get_Coordinates(lng, lat, radians(0), 0.5)
lng, lat = lng_next, lat_next
print('区域内一共取到{}个点位'.format(len(lis_point)))
print(lis_point)
return lis_point
def test(lng, lat):
"""单点测试"""
coordinate = Get_Parameter(lng, lat)
# print('coordinate----', coordinate)
payload = {}
lis_fourpoint = []
lis_fourpoint.append(coordinate)
print('lis--------', lis_fourpoint)
# payload['height'] = 100
payload['height'] = 200
payload['coord'] = lis_fourpoint
print('payload-----------', payload)
data = json.dumps(payload)
# Send_Request(lng, lat, url, payload)
ua = UserAgent()
headers = {
'Content-Type': 'application/json',
'User-Agent': ua.random
}
proxy = {'http': 'http://' + '59.63.74.97:3256'}
s = requests.Session()
s.mount('http://', HTTPAdapter(max_retries=3))
s.mount('https://', HTTPAdapter(max_retries=3))
s.keep_alive = False
resp = s.post(url, headers=headers, proxies=proxy, verify=False, data=data, timeout=50)
res = json.loads(resp.text)
datas = res['rose']
rose_str = ''
for data in datas:
val = data['value']
rose_str = rose_str + ' ' + str(val)
ss = str(lng) + ' ' + str(lat) + rose_str
print('结果:')
print(ss)
def Get_Proxy():
"""获取ip"""
pro = requests.get('http://localhost:5555/random')
prox = pro.text
proxy = {'http': 'http://' + prox}
return proxy
def work(point_list, url):
"""多线程获取点位风速"""
global proxies, base_dir
# for point in point_list:
for i in range(len(point_list)):
ll = len(point_list)
if ll == 0:
print('点位已被取完------------------------')
# global listen
# listen = False
return
lock.acquire()
point = point_list.pop(0)
lock.release()
lng, lat = point[0], point[1]
# print('point--------------', point)
coordinate = Get_Parameter(lng, lat)
payload = {}
lis_fourpoint = []
lis_fourpoint.append(coordinate)
payload['height'] = 200
payload['coord'] = lis_fourpoint
ua = UserAgent()
headers = {
'Content-Type': 'application/json',
'User-Agent': ua.random
}
# 获取代理ip
proxy_ip = random.choice(proxies)
proxy = {'http': 'http://' + proxy_ip}
# proxy = Get_Proxy()
data = json.dumps(payload)
# print(data)
s = requests.Session()
s.mount('http://', HTTPAdapter(max_retries=3))
s.mount('https://', HTTPAdapter(max_retries=3))
s.keep_alive = False
# resp = s.post(url, headers=headers, proxies=proxy, verify=False, data=data, timeout=50)
try:
resp = s.post(url, headers=headers, proxies=proxy, verify=False, data=data, timeout=50)
res = json.loads(resp.text)
print(resp.text)
if 'status' in res:
# print('ip被限制,请求失败,{}点位未获取到area_means---------'.format(point))
lock.acquire()
with open(base_dir + '/log.txt', 'a+') as w:
w.write('请求失败,{}点位未获取到rose_value---------\n'.format(point))
w.write('{}点位已重新放入\n'.format(point))
point_list.append(point)
lock.release()
resp.close()
time.sleep(5)
return
# data = res['area_means']
# wind_speed = data[0]['val']
# height = payload['height']
datas = res['rose']
rose_str = ''
for data in datas:
val = data['value']
rose_str = rose_str + ',' + str(val)
except requests.exceptions.RequestException as e:
print(e)
print('point点位为---------', point)
lock.acquire()
with open(base_dir + '/log.txt', 'a+') as w:
w.write('{}点位请求异常------------------\n'.format(point))
w.write('{}点位已被重新放入\n'.format(point))
point_list.append(point)
lock.release()
# print('{}点位已被重新放入'.format(point))
time.sleep(5)
return
lock.acquire()
with open(base_dir + '/windFrequencyRose_200_2.txt', 'a+') as f:
# f.write(str(lng) + ' ' + str(lat) + ' ' + str(height) + 'm' + ' ' + str(wind_speed) + 'm/s' + '\n')
f.write(str(lng) + ',' + str(lat) + rose_str + '\n')
with open(base_dir + '/log.txt', 'a+') as l:
l.write('{},{}点位已写入, 风向为{}\n'.format(lng, lat, rose_str))
lock.release()
# print('-' * 20)
# print('{},{}点位已写入,风向为{}'.format(lng, lat, rose_str))
# print('剩余点位:', (ll-1))
# print('-' * 20)
resp.close()
time.sleep(5)
def work1(point_list, url, i):
"""线程测试"""
# print('{}线程开始工作'.format(i))
time.sleep(1)
for n in range(len(point_list)):
lock.acquire()
ss = point_list.pop(0)
ll = len(point_list)
lock.release()
time.sleep(2)
print('{}线程取出的元素为----------------------------------------------{}'.format(i, ss))
print('列表剩余点位: ', ll)
return
def check(init_thread_name=[]):
"""监听函数"""
print('监听线程开启----------')
global listen
num = 3
while listen:
now_thread_name = []
now = threading.enumerate()
for i in now:
now_thread_name.append(i.getName())
print('正在运行的线程有+++++++++++', now_thread_name)
for ip in init_thread_name:
if ip in now_thread_name:
# print(ip + '线程正在运行')
pass
else:
with open(base_dir + '/listen.txt', 'a+') as li:
li.write(ip + '线程已挂掉!!!!!!!!!!!!\n')
# print(ip + '线程已挂掉!!!!!!!!!!!!!!!!!')
global lis_point
if len(lis_point) != 0:
with open(base_dir + '/listen.txt', 'a+') as li:
li.write(ip + '线程重启中------------\n')
# print('=' * 30)
# print(ip + '线程重启中!!!!!!!!!!!!!!!!!!!!!')
# print('=' * 30)
t = threading.Thread(target=work, args=(lis_point, url), name=ip)
t.start()
else:
if num == 0:
listen = False
break
else:
num -= 1
continue
time.sleep(20)
else:
with open(base_dir + '/listen.txt', 'a+') as li:
li.write('点位取完,监听退出\n')
# print('点位取完,监听退出===========')
if __name__ == "__main__":
global proxies, base_dir
base_dir = os.path.dirname(__file__)
# lng1 = 109.951172
# lat1 = 35.995785
# # lng1 = 114.873047
# # lat1 = 32.175612
# d = 15 / sin(radians(45))
# lng2, lat2 = Get_Coordinates(lng1, lat1, radians(45), d)
# print(lng2, lat2)
# d1 = 0.5
# url = 'https://globalwindatlas.info/api/gwa/custom/windSpeed'
url = 'https://globalwindatlas.info/api/gwa/custom/windFrequencyRose'
# -----------------------------------------------------------------------------
# 单程序启动入口
# main(lng1, lat1, lng2, lat2, d, url)
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# 获取区域所有点位合集(计算)
# global lis_point
# lis_point = Get_CenterPoint(lng1, lat1, lng2, lat2, d1)
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# 获取区域所有点位合集(文件导入)
global lis_point
lis_point = []
with open(base_dir + '/points_v1.txt', 'r') as r:
points = r.readlines()
for point in points:
lng= float(point.split(',')[0])
lat = float(point.split(',')[1].split('\n')[0])
temp = []
temp.append(lng)
temp.append(lat)
lis_point.append(temp)
# print('所有区域点位合集-----', lis_point)
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# 获取单点风速
# lng_test, lat_test = 122.64125,36.53125
# test(lng_test, lat_test)
# -----------------------------------------------------------------------------
# # -----------------------------------------------------------------------------
# 多线程入口
start_time = time.time()
lock = threading.Lock()
l = []
init_thread_name = []
proxies = []
with open(base_dir + '/proxy.txt', 'r') as r:
prxs = r.readlines()
for prx in prxs:
proxies.append(prx.split('\n')[0])
# print(proxies)
for i in range(5):
t = threading.Thread(target=work, args=(lis_point, url), name='thread' + str(i))
# t = threading.Thread(target=work1, args=(lis_point, url, i), name='thread' + str(i))
l.append(t)
t.start()
names = threading.enumerate()
for i in names:
init_thread_name.append(i.getName())
print('线程列表+++++++', init_thread_name)
time.sleep(10)
global listen
listen = True
check(init_thread_name)
end_time = time.time()
print('结束++++++++++++++')
with open(base_dir + '/log.txt', 'a+') as w:
w.write('结束,总用时:{}'.format(end_time-start_time))
# # -----------------------------------------------------------------------------