drwxr-xr-x 5 root root 114 2月 12 12:35 hadoop-snappy-master

drwxr-xr-x 10 root root 205 2月 12 18:53 hive-3.1.3

drwxr-xr-x 6 root root 99 2月 12 12:23 maven-3.9.6

drwxr-xr-x 6 60692 5000 4096 2月 12 12:21 snappy-1.1.1

drwxrwxrwx 15 root root 235 2月 7 20:40 spark-3.5.0

drwxrwxrwx. 4 root root 32 2月 11 22:19 tmp

hdfs dfs -ls /

Found 3 items

drwxrwxrwx - root supergroup 0 2024-02-21 13:48 /sparklog

drwxrwxrwx - root supergroup 0 2024-02-17 01:28 /tmp

drwxrwxrwx - root supergroup 0 2024-02-17 01:04 /user


conda创建并激活虚拟环境



conda create -n pyspark python=3..
conda activate pyspark


python导包



import re
import time
import requests
import pandas as pd
from tqdm import tqdm
from lxml import etree
from pyhive import hive
from pyspark.sql import SparkSession
from pyecharts import charts
from pyecharts import options as opts


##### 二、获取数据


确定目标网址:https://www.hongheiku.com/category/gdjsgdp  
 获取伪装参数:‘user-agent’:‘Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36’  
 通过requests构造get请求获取单页html数据。  
 使用xpath解析式解析html标签数据获得省份、GDP列表数据和总页数整数数据。  
 通过pandas将列表数据转为数据帧dataframe。



获取单页数据

def get_page_data(page):
urls = {
‘page=1’:‘https://www.hongheiku.com/category/gdjsgdp’,
‘page>1’:‘https://www.hongheiku.com/category/gdjsgdp/page/{}’.format(page)}
headers = {
‘user-agent’:‘Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36’}
url = urls[‘page=1’] if page==1 else urls[‘page>1’]
response = requests.get(url=url,headers=headers)
if response.status_code != 200:
return response.status_code
if response.status_code == 200:
text = response.text
# with open(‘data.html’,‘w’,encoding=‘utf-8’) as file:
# file.write(text)
# with open(‘data.html’,‘r’,encoding=‘utf-8’) as file:
# text = file.read()
element = etree.HTML(text)
id = element.xpath(‘//tr[@class=“even”]/td[@class=“column-1”]//center//text()’)
pro = element.xpath(‘//tr[@class=“even”]/td[@class=“column-2”]//center//text()’)
gdp = element.xpath(‘//tr[@class=“even”]/td[@class=“column-3”]//center//text()’)
year = element.xpath(‘//tr[@class=“even”]/td[@class=“column-4”]//center//text()’)
df = pd.DataFrame({
‘id’:id,
‘pro’:pro,
‘gdp’:gdp,
‘year’:year})
pages = ‘’.join(element.xpath(‘//div[@class=“pagination pagination-multi”]//span/text()’))
pages = int(‘’.join(re.findall(‘共 ([0-9]) 页’,pages)))
time.sleep(1)
return df,pages


循环调用get\_page\_data函数获取所有页数据。



获取全部数据

def get_data():
df,pages = get_page_data(1)
for i in tqdm(range(2,pages+1,1)):
dfi,pages = get_page_data(i)
df = pd.concat([df,dfi])
df.to_csv(‘data.csv’,encoding=‘utf-8’,index=None)


##### 三、清洗数据


筛选出年份为2022年的数据。  
 去除GDP列自带的单位字符“亿”。  
 标准化省份数据,打通地区数据接口。



清洗数据

def clean_data(file_path):
df = pd.read_csv(file_path,encoding=‘utf-8’,dtype=str)
df = df[df[‘year’] == ‘2022’]
df[‘gdp’] = list(map(lambda x:float(str(x).rstrip(‘亿’)),df[‘gdp’]))
dict0 = {
‘重庆’:‘重庆市’,‘北京’:‘北京市’,‘天津’:‘天津市’,‘上海’:‘上海市’,
‘香港’:‘香港特别行政区’,‘澳门’:‘澳门特别行政区’,
‘内蒙古’:‘内蒙古自治区’,‘西藏’:‘西藏自治区’,
‘新疆’:‘新疆维吾尔自治区’,‘宁夏’:‘宁夏回族自治区’,‘广西’:‘广西壮族自治区’}
df[‘pro’] = list(map(
lambda x:str(x).lstrip(‘[’).partition(‘]’)[0],df[‘pro’]))
df[‘pro’] = list(map(
lambda x:dict0[x] if x in dict0.keys() else x +‘省’,df[‘pro’]))
return df


##### 四、聚合数据


按照省份对GDP进行求平均聚合。  
 对省平均GDP统一保留两位小数。



聚合数据

def work_data(df:pd.DataFrame):
df = df.groupby([‘pro’]).mean(‘gdp’).reset_index().sort_values(‘gdp’,ascending=False)
df.index = range(df.shape[0])
df[‘gdp’] = list(map(lambda x:round(x,2),df[‘gdp’]))
return df


##### 五、上传数据


启动伪分布式集群代码。



start-all.sh
mr-jobhistory-daemon.sh start historyserver
/opt/spark-3.5.0/sbin/start-all.sh
/opt/spark-3.5.0/sbin/start-history-server.sh
nohup hive --service metastore &


检测metastore(hive)端口是否启动成功。



netstat -anp|grep 9083

tcp6 0 0 :::9083 ::😗 LISTEN 9919/java


hive编写sql创建数据库。



create database myproject;


pyspark上传数据到metastore。



上传数据

def upload_data(df:pd.DataFrame):
global spark
spark = SparkSession.Builder(
).appName(‘test’).master(‘local[*]’)
.config(‘spark.sql.shuffle.partitions’,‘2’)
.config(‘spark.sql.warehouse.dir’,‘hdfs://ml:9000/user/hive/warehouse’)
.config(‘hive.metastore.uris’,‘thrift://ml:9083’
).enableHiveSupport().getOrCreate()
df = spark.createDataFrame(df)
spark.sql(‘drop table if exists myproject.data;’)
df.write.mode(‘overwrite’).saveAsTable(‘myproject.data’,‘parquet’)
df.show()


编写标准入口函数。



入口函数

def main():
try:
get_data()
except:
print(get_data())
df = clean_data(‘data.csv’)
df = work_data(df)
upload_data(df)
if name == ‘__main__’:
main()


上传代码到linux系统。


![img](https://img-blog.csdnimg.cn/img_convert/1f19a1cf4e3081f4f413d8ebab49a69e.png)
![img](https://img-blog.csdnimg.cn/img_convert/39a2a3032ce93775d4fede1aa95df702.png)
![img](https://img-blog.csdnimg.cn/img_convert/4c334db27eb12a3266394ad72f013dd6.png)

**既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!**

main()

上传代码到linux系统。

[外链图片转存中…(img-e0aQt5dC-1725716520156)]
[外链图片转存中…(img-JAolBzf7-1725716520156)]
[外链图片转存中…(img-bgSWZ4qp-1725716520157)]

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐