75 lines
1.8 KiB
Python
75 lines
1.8 KiB
Python
|
|
import os
|
||
|
|
import threading
|
||
|
|
|
||
|
|
import multiprocessing
|
||
|
|
import pandas as pd
|
||
|
|
import numpy as np
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
def _apply_df(args):
|
||
|
|
df, func, num, kwargs = args
|
||
|
|
return num, df.apply(func, **kwargs)
|
||
|
|
|
||
|
|
def apply_by_multiprocessing(df,func,**kwargs):
|
||
|
|
cores = multiprocessing.cpu_count()
|
||
|
|
workers=kwargs.pop('workers') if 'workers' in kwargs else cores
|
||
|
|
pool = multiprocessing.Pool(processes=workers)
|
||
|
|
result = pool.map(_apply_df, [(d, func, i, kwargs) for i,d in enumerate(np.array_split(df, workers))])
|
||
|
|
pool.close()
|
||
|
|
result=sorted(result,key=lambda x:x[0])
|
||
|
|
return pd.concat([i[1] for i in result])
|
||
|
|
|
||
|
|
def square(x):
|
||
|
|
return x**x
|
||
|
|
|
||
|
|
if __name__ == '__main__':
|
||
|
|
df = pd.DataFrame({'a':range(10), 'b':range(10)})
|
||
|
|
apply_by_multiprocessing(df, square, axis=1, workers=4)
|
||
|
|
|
||
|
|
|
||
|
|
def rm_rf(d):
|
||
|
|
for path in (os.path.join(d,f) for f in os.listdir(d)):
|
||
|
|
if os.path.isdir(path):
|
||
|
|
rm_rf(path)
|
||
|
|
else:
|
||
|
|
os.unlink(path)
|
||
|
|
os.rmdir(d)
|
||
|
|
|
||
|
|
def create_dir(direc):
|
||
|
|
if not os.path.exists(direc):
|
||
|
|
os.makedirs(direc)
|
||
|
|
else:
|
||
|
|
rm_rf(direc)
|
||
|
|
create_dir(direc)
|
||
|
|
|
||
|
|
|
||
|
|
#################### Now make the data generator threadsafe ####################
|
||
|
|
|
||
|
|
class threadsafe_iter:
|
||
|
|
"""Takes an iterator/generator and makes it thread-safe by
|
||
|
|
serializing call to the `next` method of given iterator/generator.
|
||
|
|
"""
|
||
|
|
def __init__(self, it):
|
||
|
|
self.it = it
|
||
|
|
self.lock = threading.Lock()
|
||
|
|
|
||
|
|
def __iter__(self):
|
||
|
|
return self
|
||
|
|
|
||
|
|
def __next__(self): # Py3
|
||
|
|
with self.lock:
|
||
|
|
return next(self.it)
|
||
|
|
|
||
|
|
def next(self): # Py2
|
||
|
|
with self.lock:
|
||
|
|
return self.it.next()
|
||
|
|
|
||
|
|
|
||
|
|
def threadsafe_generator(f):
|
||
|
|
"""A decorator that takes a generator function and makes it thread-safe.
|
||
|
|
"""
|
||
|
|
def g(*a, **kw):
|
||
|
|
return threadsafe_iter(f(*a, **kw))
|
||
|
|
return g
|