Custom thread pool in java without executor framework (with example)

By | January 20, 2016
  • Create a thread pool in java without using executor framework.
  • We will use blocking queue to implements a thread pool.

Thread pool in java?

  • Thread pool is collection of threads, which are created to perform certain tasks.
  • Thread creation is costly IO operation.
    • It’s not advisable to create & destroy thread(s) every now and then.
  • It’s recommended to use pool of threads as per our need.
  • We will implement custom thread pool using following classes.
    1. BlockingQueue: BlockingQueue class will be used to store tasks.
    2. TaskExecutor: TaskExecutor class is capable of executing the task.
    3. ThreadPool: ThreadPool class is responsible for en-queuing task to blocking queue,
    4. TestTask: The task or operation, which we want to execute.
    5. TestThreadPool: TestThreadPool class creates the tasks and submit tasks to thread pool.
Thread pool executor java

Fig 1: Execution of thread pool

Execution of thread pool in java

  1. Task Producer will generate the task.
  2. Task submitted to Blocking queue (our custom implementation)
  3. Available threads (Task Executor) in the thread pool get the tasks from blocking queue
  4. Thread(s) executes & finishes the task
  5. Thread become available to pick another task from queue

Custom thread pool example without using executor framework

  • We have used the custom blocking queue implementation to demonstrate the thread pool in java.
  • The ThreadPool encapsulates the custom BlockingQueue class and TaskExecutor class.

1.) ThreadPool class

  1. Threadpool class creates numbers of TaskExecutor instances.
    • TaskExecutor class will be responsible for executing the tasks
  2. ThreadPool class exposes one method submitTask.
    • submitTask method will be called by task generating program, to submit a task to threadPool.

package org.learn.Pool;

public class ThreadPool {
	
    BlockingQueue  queue;
    public ThreadPool(int queueSize, int nThread) {
        queue = new BlockingQueue<>(queueSize);
        String threadName = null;
        TaskExecutor task = null;
        for (int count = 0; count < nThread; count++) {
        	threadName = "Thread-"+count;
        	task = new TaskExecutor(queue);
            Thread thread = new Thread(task, threadName);
            thread.start();
        }
    }

    public void submitTask(Runnable task) throws InterruptedException {
        queue.enqueue(task);
    }
}

2.) TaskExecutor class:

  1. TaskExecutor class implements Runnable interface.
  2. The method of TaskExecutor class dequeue the task from the queue (BlockingQueue)
  3. TaskExecutor class executes the task.

package org.learn.Pool;

public class TaskExecutor implements Runnable {
    BlockingQueue queue;
    
    public TaskExecutor(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String name = Thread.currentThread().getName();
                Runnable task = queue.dequeue();
                System.out.println("Task Started by Thread :" + name);
                task.run();
                System.out.println("Task Finished by Thread :" + name);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3.) BlockingQueue class to implement thread pool

  1. BlockingQueue simulates the blocking queue implementation.
  2. We have used LinkedList as underlying data structure.
  3. BlockingQueue contains couple of synchronized methods
    • enqueue : It enqueue (push) Task to the queue
    • dequeue : This method takes (pop) the task from the queue.

package org.learn.Pool;

import java.util.LinkedList;
import java.util.Queue;

public class BlockingQueue  {
    private Queue queue = new LinkedList();
    private int EMPTY = 0;
    private int MAX_TASK_IN_QUEUE = -1;

    public BlockingQueue(int size){
        this.MAX_TASK_IN_QUEUE = size;
    }

    public synchronized void enqueue(Type task)
            throws InterruptedException  {
        while(this.queue.size() == this.MAX_TASK_IN_QUEUE) {
            wait();
        }
        if(this.queue.size() == EMPTY) {
            notifyAll();
        }
        this.queue.offer(task);
    }

    public synchronized Type dequeue()
            throws InterruptedException{
        while(this.queue.size() == EMPTY){
            wait();
        }
        if(this.queue.size() == this.MAX_TASK_IN_QUEUE){
            notifyAll();
        }
        return this.queue.poll();
    }
}

4.) TestTask Class 

  • TestTask simulates the task to be submitted to thread pool.

package org.learn.App;

public class TestTask implements Runnable {
    private int number;
    public TestTask(int number) {
        this.number = number;
    }
   @Override
    public void run() {
        System.out.println("Start executing of task number :"+ number);
        try {
            //Simulating processing time
        	//perform tasks
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("End executing of task number :"+ number);
    }
}

5.) TestThreadPool class to validate thread pool

  •  TestThreadPool class containing main function to test the thread pool.

package org.learn.App;

import org.learn.Pool.ThreadPool;

public class TestThreadPool {
    public static void main(String[] args) throws InterruptedException {
    	//create queue size - 3
    	//Number of threads - 4
        ThreadPool threadPool = new ThreadPool(3,4);
        //Created 15 Tasks and submit to pool
        for(int taskNumber = 1 ; taskNumber <= 7; taskNumber++) {
            TestTask task = new TestTask(taskNumber);
            threadPool.submitTask(task);
        }
    }
}

Output – thread pool example without executor framework


Task Started by Thread :Thread-2
Start executing of task number :2
Task Started by Thread :Thread-0
Start executing of task number :4
Task Started by Thread :Thread-1
Start executing of task number :3
Task Started by Thread :Thread-3
Start executing of task number :1
End executing of task number :2
End executing of task number :3
End executing of task number :4
Task Finished by Thread :Thread-0
Task Started by Thread :Thread-0
Task Finished by Thread :Thread-1
Task Finished by Thread :Thread-2
Task Started by Thread :Thread-1
Start executing of task number :6
End executing of task number :1
Start executing of task number :5
Task Finished by Thread :Thread-3
Task Started by Thread :Thread-2
Start executing of task number :7
End executing of task number :6
Task Finished by Thread :Thread-1
End executing of task number :7
Task Finished by Thread :Thread-2
End executing of task number :5
Task Finished by Thread :Thread-0

Download code – custom thread pool without executor framework