从 Hive 获取数据到本地的方法 🦄

我们使用pyspark 读取Hive里的表格,存储到HDFS,再get到跳板机,再rsync到本地,再用dask读取为Dataframe。

第一版

1
2
3
4
5
6
7
8
9
10
11
12
13
 1 from pyspark.sql import *
2 import pyspark
3 if __name__ == '__main__':
4 spark = SparkSession.builder.master("yarn").appName("pyspark location process").enableHiveSupport().getOrCreate()
5 sc = spark.sparkContext
6 # spark.sql('show databases').show()
7 spark.sql('use annals').show()
8 # spark.sql('describe gps2').show()
9 spark.sql('select * from gps2 limit 1').show()
10 sql_df = spark.sql('select uid, lat, lgt, app_adjust_time from gps2 limit 5')
11 #sql_df.show()
12 print(type(sql_df))
13 sql_df.write.save("data/GrMWKfDj9eIjsRuh.parquet")

第二版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# find pyspark and import
import os
import sys
import gc
os.environ['SPARK_HOME'] = "/opt/cloudera/parcels/SPARK2/lib/spark2"
sys.path.append(os.path.join("/home/ubuntu/data/pythonpackage"))
sys.path.append(os.path.join(os.environ['SPARK_HOME'], "python"))
sys.path.append(os.path.join(os.environ['SPARK_HOME'], "python/lib/py4j-0.10.6-src.zip"))

try:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark import SparkConf

print("success")

except ImportError as e:
print("error importing spark modules", e)
sys.exit(1)

# SparkSession初始化
spark = SparkSession \
.builder \
.master("yarn") \
.appName("gps") \
.config("spark.submit.deployMode","client") \
.config("num-executors", 5) \
.config("executor-cores", 4) \
.config("executor-memory", "2g") \
.config("driver-memory", "1g") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext

# spark sql,得到dataframe
df = spark.sql("SELECT * FROM tmp.czn_userlocation_filteredby_black")
df.printSchema()
df.show(3)

# 写到hdfs上,parquet格式
df.write.parquet("/user/akulaku/czn_userlocation_filteredby_black.parquet")

这样我们就在hdfs上得到了一份parquet格式的文件。

第三版

有时候我们可以在hive或者impala上建表,例如

1
create table tmp.czn_usersensorlog_of_may as SELECT uid, apptime, value0, value1, value2 FROM ods.usersensorlog WHERE month = "2018_05" 

show create table tmp.czn_usersensorlog_of_may
CREATE TABLE tmp.czn_usersensorlog_of_may ( uid BIGINT, apptime BIGINT, value0 FLOAT, value1 FLOAT, value2 FLOAT ) STORED AS TEXTFILE LOCATION 'hdfs://nameservice1/user/hive/warehouse/tmp.db/czn_usersensorlog_of_may'

第四版

有时候我们需要指定表的格式为parquet,这样有利于用pandas或者dask读取,那么可以如下设置,这里我们先建表再插入数据,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
drop table if exists tmp.czn_userlocation_tmp_test;
CREATE TABLE tmp.czn_userlocation_tmp_test
(
uid BIGINT,
lat float,
lgt float,
apptime BIGINT,
)
STORED AS PARQUET
TBLPROPERTIES ('parquet.compression'='SNAPPY');

insert OVERWRITE TABLE tmp. czn_userlocation_tmp_test
SELECT uid, lat, lgt, apptime
from kafka_table.userlocation2
WHERE apptime between 1532156404000 and 1532156764000;

得到了hdfs路径,这里我们可以直接用hdfs命令get

此时数据只是在hdfs上,我们要下载还需要将数据从hdfs 复制到跳板机,

hadoop fs -get /user/akulaku/czn_userlocation_filteredby_black.parquet chenzn/

再scp 之类的 复制到本地即可,不过相比scp,rsync更适合传输parquet,例如,可以压缩,断点续传等等,

1
rsync -P -z -r -e ssh akulaku@cdh-master4:~/chenzn/czn_userlocation_filteredby_black.parquet ~

parquet格式文件的读取可以使用dask,并选择‘pyarrow’引擎,可以顺利读取超过内存的数据,

1
2
import dask.dataframe as dd
df = dd.read_parquet(file_path, engine='pyarrow')

附录

rsync其中参数如下,参考自How to copy files with rsync over SSH - Tutorials For Kyup.com

  • –delete - delete files that don’t exist on sender (system)
  • -v - verbose (-vv will provide more detailed information)
  • -e “ssh options” - specify the ssh as remote shell
  • -a - archive mode - it preserves permissions (owners, groups), times, symbolic links, and devices
  • -r - recurse into directories
  • -z - compress file data during transfer
  • –exclude ‘foldername’ – excludes the corresponding folder from transfer
  • -P – show progress during transfer

🐶🐒