import os import threading import multiprocessing import pandas as pd import numpy as np def _apply_df(args): df, func, num, kwargs = args return num, df.apply(func, **kwargs) def apply_by_multiprocessing(df,func,**kwargs): cores = multiprocessing.cpu_count() workers=kwargs.pop('workers') if 'workers' in kwargs else cores pool = multiprocessing.Pool(processes=workers) result = pool.map(_apply_df, [(d, func, i, kwargs) for i,d in enumerate(np.array_split(df, workers))]) pool.close() result=sorted(result,key=lambda x:x[0]) return pd.concat([i[1] for i in result]) def square(x): return x**x if __name__ == '__main__': df = pd.DataFrame({'a':range(10), 'b':range(10)}) apply_by_multiprocessing(df, square, axis=1, workers=4) def rm_rf(d): for path in (os.path.join(d,f) for f in os.listdir(d)): if os.path.isdir(path): rm_rf(path) else: os.unlink(path) os.rmdir(d) def create_dir(direc): if not os.path.exists(direc): os.makedirs(direc) else: rm_rf(direc) create_dir(direc) #################### Now make the data generator threadsafe #################### class threadsafe_iter: """Takes an iterator/generator and makes it thread-safe by serializing call to the `next` method of given iterator/generator. """ def __init__(self, it): self.it = it self.lock = threading.Lock() def __iter__(self): return self def __next__(self): # Py3 with self.lock: return next(self.it) def next(self): # Py2 with self.lock: return self.it.next() def threadsafe_generator(f): """A decorator that takes a generator function and makes it thread-safe. """ def g(*a, **kw): return threadsafe_iter(f(*a, **kw)) return g