我们使用pyspark 读取Hive里的表格,存储到HDFS,再get到跳板机,再rsync到本地,再用dask读取为Dataframe。
第一版 1 2 3 4 5 6 7 8 9 10 11 12 13 1 from pyspark.sql import * 2 import pyspark 3 if __name__ == '__main__' : 4 spark = SparkSession.builder.master("yarn" ).appName("pyspark location process" ).enableHiveSupport().getOrCreate() 5 sc = spark.sparkContext 6 7 spark.sql('use annals' ).show() 8 9 spark.sql('select * from gps2 limit 1' ).show() 10 sql_df = spark.sql('select uid, lat, lgt, app_adjust_time from gps2 limit 5' )11 12 print (type (sql_df))13 sql_df.write.save("data/GrMWKfDj9eIjsRuh.parquet" )
第二版 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import osimport sysimport gc os.environ['SPARK_HOME' ] = "/opt/cloudera/parcels/SPARK2/lib/spark2" sys.path.append(os.path.join("/home/ubuntu/data/pythonpackage" )) sys.path.append(os.path.join(os.environ['SPARK_HOME' ], "python" )) sys.path.append(os.path.join(os.environ['SPARK_HOME' ], "python/lib/py4j-0.10.6-src.zip" )) try : from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark import SparkConf print ("success" ) except ImportError as e: print ("error importing spark modules" , e) sys.exit(1 ) spark = SparkSession \ .builder \ .master("yarn" ) \ .appName("gps" ) \ .config("spark.submit.deployMode" ,"client" ) \ .config("num-executors" , 5 ) \ .config("executor-cores" , 4 ) \ .config("executor-memory" , "2g" ) \ .config("driver-memory" , "1g" ) \ .enableHiveSupport() \ .getOrCreate() sc = spark.sparkContext df = spark.sql("SELECT * FROM tmp.czn_userlocation_filteredby_black" ) df.printSchema() df.show(3 ) df.write.parquet("/user/akulaku/czn_userlocation_filteredby_black.parquet" )
这样我们就在hdfs上得到了一份parquet格式的文件。
第三版 有时候我们可以在hive或者impala上建表,例如
1 create table tmp.czn_usersensorlog_of_may as SELECT uid, apptime, value0, value1, value2 FROM ods.usersensorlog WHERE month = "2018_05"
show create table tmp.czn_usersensorlog_of_may
CREATE TABLE tmp.czn_usersensorlog_of_may ( uid BIGINT, apptime BIGINT, value0 FLOAT, value1 FLOAT, value2 FLOAT ) STORED AS TEXTFILE LOCATION 'hdfs://nameservice1/user/hive/warehouse/tmp.db/czn_usersensorlog_of_may'
第四版 有时候我们需要指定表的格式为parquet,这样有利于用pandas或者dask读取,那么可以如下设置,这里我们先建表再插入数据,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 drop table if exists tmp.czn_userlocation_tmp_test; CREATE TABLE tmp.czn_userlocation_tmp_test ( uid BIGINT, lat float , lgt float , apptime BIGINT, ) STORED AS PARQUET TBLPROPERTIES ('parquet.compression' ='SNAPPY' ); insert OVERWRITE TABLE tmp. czn_userlocation_tmp_test SELECT uid, lat, lgt, apptime from kafka_table.userlocation2 WHERE apptime between 1532156404000 and 1532156764000;
得到了hdfs路径,这里我们可以直接用hdfs命令get
此时数据只是在hdfs上,我们要下载还需要将数据从hdfs 复制到跳板机,
hadoop fs -get /user/akulaku/czn_userlocation_filteredby_black.parquet chenzn/
再scp 之类的 复制到本地即可,不过相比scp,rsync更适合传输parquet,例如,可以压缩,断点续传等等,
1 rsync -P -z -r -e ssh akulaku@cdh-master4:~/chenzn/czn_userlocation_filteredby_black.parquet ~
parquet格式文件的读取可以使用dask,并选择‘pyarrow’引擎,可以顺利读取超过内存的数据,
1 2 import dask.dataframe as dddf = dd.read_parquet(file_path, engine='pyarrow' )
附录 rsync其中参数如下,参考自How to copy files with rsync over SSH - Tutorials For Kyup.com :
–delete - delete files that don’t exist on sender (system)
-v - verbose (-vv will provide more detailed information)
-e “ssh options” - specify the ssh as remote shell
-a - archive mode - it preserves permissions (owners, groups), times, symbolic links, and devices
-r - recurse into directories
-z - compress file data during transfer
–exclude ‘foldername’ – excludes the corresponding folder from transfer
-P – show progress during transfer
🐶🐒