处理大数据集的建议

最近的一些比赛如TalkingData AdTracking Fraud Detection Challenge | Kaggle提供了很大的数据集,一般来说,只有16G的内存的“小”电脑都无法直接处理这种数据集了,本文收集了一些关于处理这种数据的建议,供大家参考。

1.及时删除无用变量并垃圾回收

通常我们在特征工程中会涉及大量的转换操作,产生很多的中间变量等,除了使用del以外,使用gc.collect()也是个不错的选择。

1
2
3
4
5
6
7
8
temp = pd.read_csv('../input/train_sample.csv')

#do something to the file
temp['os'] = temp['os'].astype('str')
#delete when no longer needed
del temp
#collect residual garbage
gc.collect()

2.预定义数据类型

pandas一般会自己推断数据类型,不过倾向于使用耗费空间大的,如下面例子所示,预定义数据类型节省了超过一半的空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
dtypes = {
'ip' : 'uint32',
'app' : 'uint16',
'device' : 'uint16',
'os' : 'uint16',
'channel' : 'uint16',
'is_attributed' : 'uint8',
}

dtypes2 = {
'ip' : 'int32',
'app' : 'int16',
'device' : 'int16',
'os' : 'int16',
'channel' : 'int16',
'is_attributed' : 'int8',
}

train = pd.read_csv(train_sample_file,parse_dates=['click_time'])

#check datatypes:
train.info()

train = pd.read_csv(train_sample_file,dtype=dtypes,parse_dates=['click_time'])

#check datatypes:
train.info()


train = pd.read_csv(train_sample_file,dtype=dtypes2,parse_dates=['click_time'])

#check datatypes:
train.info()

'''
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 8 columns):
ip 100000 non-null int64
app 100000 non-null int64
device 100000 non-null int64
os 100000 non-null int64
channel 100000 non-null int64
click_time 100000 non-null datetime64[ns]
attributed_time 227 non-null object
is_attributed 100000 non-null int64
dtypes: datetime64[ns](1), int64(6), object(1)
memory usage: 6.1+ MB
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 8 columns):
ip 100000 non-null uint32
app 100000 non-null uint16
device 100000 non-null uint16
os 100000 non-null uint16
channel 100000 non-null uint16
click_time 100000 non-null datetime64[ns]
attributed_time 227 non-null object
is_attributed 100000 non-null uint8
dtypes: datetime64[ns](1), object(1), uint16(4), uint32(1), uint8(1)
memory usage: 2.8+ MB
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 8 columns):
ip 100000 non-null int32
app 100000 non-null int16
device 100000 non-null int16
os 100000 non-null int16
channel 100000 non-null int16
click_time 100000 non-null datetime64[ns]
attributed_time 227 non-null object
is_attributed 100000 non-null int8
dtypes: datetime64[ns](1), int16(4), int32(1), int8(1), object(1)
memory usage: 2.8+ MB
'''

3.只使用csv文件内的指定行

a) 指定行数

直接使用nrows指定

1
2
train = pd.read_csv('../input/train.csv', nrows=1e5, dtype=dtypes)

b) 跳过行数

比如我们跳过前500w取100w下面保留了head,

1
2
3

train = pd.read_csv('../input/train.csv', skiprows=range(1, 5000000), nrows=1000000, dtype=dtypes)

c) sampling

1
2
3
4
5
6
7
8
9
10
11
import subprocess
print('# Line count:')
for file in ['train.csv', 'test.csv', 'train_sample.csv']:
lines = subprocess.run(['wc', '-l', '../input/{}'.format(file)], stdout=subprocess.PIPE).stdout.decode('utf-8')
print(lines, end='', flush=True)
'''
# Line count:
184903891 ../input/train.csv
18790470 ../input/test.csv
100001 ../input/train_sample.csv
'''

train一共有lines=184903891 行,那么假设我们需要采样出100w行,那么我们需要跳过lines - 1 - 1000000行,即

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

#generate list of lines to skip
skiplines = np.random.choice(np.arange(1, lines), size=lines-1-1000000, replace=False)

#sort the list
skiplines=np.sort(skiplines)
#check our list
print('lines to skip:', len(skiplines))
print('remaining lines in sample:', lines-len(skiplines), '(remember that it includes the heading!)')

###################SANITY CHECK###################
#find lines that weren't skipped by checking difference between each consecutive line
#how many out of first 100000 will be imported into the csv?
diff = skiplines[1:100000]-skiplines[2:100001]
remain = sum(diff!=-1)
print('Ratio of lines from first 100000 lines:', '{0:.5f}'.format(remain/100000) )
print('Ratio imported from all lines:', '{0:.5f}'.format((lines-len(skiplines))/lines) )
train = pd.read_csv('../input/train.csv', skiprows=skiplines, dtype=dtypes)
train.head()
del skiplines
gc.collect()

4.使用pandas 的生成器,用chunk处理

这里我们使用np.where过滤掉‘is_attributed’为0的部分(例如[xv if c else yv for (c,xv,yv) in zip(condition,x,y)]

1
2
3
4
5
6
7
8
9
10
#set up an empty dataframe
df_converted = pd.DataFrame()

#we are going to work with chunks of size 1 million rows
chunksize = 10 ** 6

#in each chunk, filter for values that have 'is_attributed'==1, and merge these values into one dataframe
for chunk in pd.read_csv('../input/train.csv', chunksize=chunksize, dtype=dtypes):
filtered = (chunk[(np.where(chunk['is_attributed']==1, True, False))])
df_converted = pd.concat([df_converted, filtered], ignore_index=True, )

5.只载入若干列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#wanted columns
columns = ['ip', 'click_time', 'is_attributed']
dtypes = {
'ip' : 'uint32',
'is_attributed' : 'uint8',
}

ips_df = pd.read_csv('../input/train.csv', usecols=columns, dtype=dtypes)
print(ips_df.info())
ips_df.head()
'''
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 184903890 entries, 0 to 184903889
Data columns (total 3 columns):
ip uint32
click_time object
is_attributed uint8
dtypes: object(1), uint32(1), uint8(1)
memory usage: 2.2+ GB
None'''

6.结合多种方法创意性的处理数据

例如无法使用整个数据来groupby那么可以分块来做,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

size=100000
all_rows = len(ips_df)
num_parts = all_rows//size

#generate the first batch
ip_sums = ips_df[0:size][['ip', 'is_attributed']].groupby('ip', as_index=False).sum()

#add remaining batches
for p in range(1,num_parts):
start = p*size
end = p*size + size
if end < all_rows:
group = ips_df[start:end][['ip', 'is_attributed']].groupby('ip', as_index=False).sum()
else:
group = ips_df[start:][['ip', 'is_attributed']].groupby('ip', as_index=False).sum()
ip_sums = ip_sums.merge(group, on='ip', how='outer')
ip_sums.columns = ['ip', 'sum1','sum2']
ip_sums['conversions_per_ip'] = np.nansum((ip_sums['sum1'], ip_sums['sum2']), axis = 0)
ip_sums.drop(columns=['sum1', 'sum2'], axis = 0, inplace=True)

7.使用dask代替pandas

1
2
import dask
import dask.dataframe as dd