How to Use MultiThreadedMapper in MapReduce
A practical, developer-focused guide to using Hadoop’s MultithreadedMapper
for parallelizing map tasks and improving performance in CPU-bound jobs.
—
Table of Contents
Introduction
In a standard MapReduce job, each call to Mapper.map()
is handled by a single thread, and key-value pairs are processed serially. However, for CPU-bound tasks, you can speed up processing by running multiple threads within a single mapper task using Hadoop’s MultithreadedMapper
class.
Why Use MultithreadedMapper?
- Parallelism: Utilize multiple CPU cores by running several threads per mapper task
- Performance: Speed up CPU-bound map operations
- Flexibility: Control the number of threads per mapper
Note: Your Mapper
implementation must be thread-safe!
How MultithreadedMapper Works
MultithreadedMapper
creates a thread pool within each mapper task- Each thread runs the
map()
method on a subset of the input split - Threads process key-value pairs in parallel, improving throughput
- The number of threads is configurable
Configuring MultithreadedMapper in Your Job
Via Configuration Properties
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setMapperClass(MultithreadedMapper.class);
conf.set("mapred.map.multithreadedrunner.class", WordCountMapper.class.getCanonicalName());
conf.set("mapred.map.multithreadedrunner.threads", "8");
job.setJarByClass(WordCountMapper.class);
job.waitForCompletion(true);
mapred.map.multithreadedrunner.class
: The actualMapper
class to run in parallelmapred.map.multithreadedrunner.threads
: Number of threads (default: 10)
Via MultithreadedMapper Methods
MultithreadedMapper.setMapperClass(job, WordCountMapper.class);
MultithreadedMapper.setNumberOfThreads(job, 8);
Thread Safety Considerations
- The
map()
method and any shared resources must be thread-safe - Avoid using mutable shared state in your
Mapper
- Use local variables inside
map()
whenever possible
Example: Using MultithreadedMapper
Here’s a simplified example of how MultithreadedMapper
manages threads internally:
/**
* Run the application's maps using a thread pool.
*/
@Override
public void run(Context context) throws IOException, InterruptedException {
outer = context;
int numberOfThreads = getNumberOfThreads(context);
mapClass = getMapperClass(context);
runners = new ArrayList<MapRunner>(numberOfThreads);
for(int i=0; i < numberOfThreads; ++i) {
MapRunner thread = new MapRunner(context);
thread.start();
runners.add(i, thread);
}
for(int i=0; i < numberOfThreads; ++i) {
MapRunner thread = runners.get(i);
thread.join();
Throwable th = thread.throwable;
if (th != null) {
if (th instanceof IOException) {
throw (IOException) th;
} else if (th instanceof InterruptedException) {
throw (InterruptedException) th;
} else {
throw new RuntimeException(th);
}
}
}
}
Use Cases
- HBase Bulk Loads: Speed up data loading in HBase using a map-only job
- CPU-bound Map Tasks: Any map operation that is CPU-intensive and can benefit from parallelism
- Custom Data Processing: When you want to maximize CPU utilization within a single mapper
Caution: Ensure your downstream systems (e.g., HBase) can handle the increased load from parallel mappers.
Summary
MultithreadedMapper
allows you to run multiple threads per mapper task for parallel map processing- Use it for CPU-bound workloads where thread safety can be guaranteed
- Configure the number of threads via properties or helper methods