Java — Execuror Framework

Prateek
4 min readJun 9, 2021

--

I tried to develop a simple example which reads data from the flatfile using Java Executors.

public static ExecutorService newFixedThreadPool(int nThreads) — Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.

Java Queue Categories

In Java, we can find many Queue implementations. W can broadly categorize them into the following two types

  • Bounded Queues
  • Unbounded Queues

Bounded Queues — are queues which are bounded by capacity that means we need to provide the max size of the queue at the time of creation. For example ArrayBlockingQueue (see previous example).

Unbounded Queues — are queues which are NOT bounded by capacity that means we should not provide the size of the queue. For example LinkedList (see previous example).

All Queues which are available in java.util package are Unbounded Queues and Queues which are available in java.util.concurrent package are Bounded Queues.

In other ways, W can broadly categorize them into the following two types:

  • Blocking Queues
  • Non-Blocking Queues

All Queues which implement BlockingQueue interface are BlockingQueues and rest are Non-Blocking Queues.

BlockingQueues blocks until it finishes it’s job or time out, but Non-BlockingQueues do not.

Some Queues are Deques and some queues are PriorityQueues.

BlockingQueue Operations

In addition to Queue’s two forms of operations, BlockingQueue’s supports two more forms as shown below.

OperationThrows exceptionSpecial valueBlocksTimes outInsertadd(e)offer(e)put(e)offer(e, time, unit)Removeremove()poll()take()poll(time, unit)Examineelement()peek()N/AN/A

Some operations are blocked until it finishes it’s job and other are blocked until time out.

That’s all of a quick roundup on Queue in Java. I hope these Java Queue examples will help you in getting started with Queue collection programming.

User.java

public class User {
private int id;
private String name;
private String emailAddress;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getEmailAddress() {
return emailAddress;
}
public void setEmailAddress(String emailAddress) {
this.emailAddress = emailAddress;
}
}

DBConnection.java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.logging.Level;
import java.util.logging.Logger;
public class DBConnection { public static Connection getConnection() {
Connection connection = null;
try {
Class.forName("com.mysql.cj.jdbc.Driver");
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
return connection;

} catch (ClassNotFoundException ex) {
Logger.getLogger(DBConnection.class.getName()).log(Level.SEVERE, null, ex);
} catch (SQLException ex) {
Logger.getLogger(DBConnection.class.getName()).log(Level.SEVERE, null, ex);
}
return connection;
}
}

UserDao.java

import com.app.beans.User;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.logging.Level;
import java.util.logging.Logger;
public class UserDao {
public int saveUser(User user) {
int rows = 0;
try {
Connection connection = DBConnection.getConnection();
PreparedStatement statement = connection.prepareStatement("insert into user values(?,?,?)");
statement.setInt(1, user.getId());
statement.setString(2, user.getName());
statement.setString(3, user.getEmailAddress());
rows = statement.executeUpdate();
} catch (SQLException ex) {
Logger.getLogger(UserDao.class.getName()).log(Level.SEVERE, null, ex);
}
return rows;
}
}

UserProcessor.java

import com.app.beans.User;
import com.app.dao.UserDao;
import java.util.StringTokenizer;
import java.util.concurrent.Callable;
public class UserProcessor implements Callable<Integer> {
private String userRecord;
private UserDao dao;
public UserProcessor(String userRecord, UserDao dao) {
this.userRecord = userRecord;
this.dao = dao;
}
@Override
public Integer call() throws Exception {
int rows = 0;
System.out.println(Thread.currentThread().getName() + " processing record for : " + userRecord);
StringTokenizer tokenizer = new StringTokenizer(userRecord, ",");
User user = null;
while (tokenizer.hasMoreTokens()) {
user = new User();
user.setEmailAddress(tokenizer.nextToken());
user.setName(tokenizer.nextToken());
user.setId(Integer.valueOf(tokenizer.nextToken()));
rows = dao.saveUser(user);
}
return rows;
}
}

TestExecutors.java

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.app.dao.UserDao;
import com.app.runnables.UserProcessor;
public class TestExecutors {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(5);
List<String> users = getUsersFromFile("C:\\new_users.txt");
UserDao dao = new UserDao();
for (String user : users) {
Future<Integer> future = service.submit(new UserProcessor(user, dao));
try {
System.out.println("Result of the operation is: " + future.get());
} catch (InterruptedException ex) {
Logger.getLogger(TestExecutors.class.getName()).log(Level.SEVERE, null, ex);
} catch (ExecutionException ex) {
Logger.getLogger(TestExecutors.class.getName()).log(Level.SEVERE, null, ex);
}
}
service.shutdown();
System.out.println("Main execution over!");
}
public static List<String> getUsersFromFile(String fileName) {
List<String> users = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader(new File(fileName)))) {
String line = null;
while ((line = reader.readLine()) != null) {
users.add(line);
}
} catch (FileNotFoundException ex) {
Logger.getLogger(TestExecutors.class.getName()).log(Level.SEVERE, null, ex);
} catch (IOException ex) {
Logger.getLogger(TestExecutors.class.getName()).log(Level.SEVERE, null, ex);
}
return users;
}
}

db.sql

CREATE TABLE `user` (
`id` int(11) NOT NULL,
`name` varchar(255) DEFAULT NULL,
`emailAddress` varchar(255) DEFAULT NULL
)

new_user.txt

henry@gmail.com,Henry Cavell,1001
brad@gmail.com,Brad Pitt,1002
leo@gmail.com,Leonardo DiCaprio,1003
matt@gmail.com,Matt Damon,1004
george@gmail.com,George Clooney,1005
daniel@gmail.com,Daniel Radcliffe,1006
emma@gmail.com,Emma Watson,1007
tom@gmail.com,Tom Hanks,1008
tom2@gmail.com,Tom Cruise,1009
christian@gmail.com,Christian Bale,1010

Here is the output -

pool-1-thread-1 processing record for : henry@gmail.com,Henry Cavell,1001
Result of the operation is: 1
pool-1-thread-2 processing record for : brad@gmail.com,Brad Pitt,1002
Result of the operation is: 1
pool-1-thread-3 processing record for : leo@gmail.com,Leonardo DiCaprio,1003
Result of the operation is: 1
pool-1-thread-4 processing record for : matt@gmail.com,Matt Damon,1004
Result of the operation is: 1
pool-1-thread-5 processing record for : george@gmail.com,George Clooney,1005
Result of the operation is: 1
pool-1-thread-1 processing record for : daniel@gmail.com,Daniel Radcliffe,1006
Result of the operation is: 1
pool-1-thread-2 processing record for : emma@gmail.com,Emma Watson,1007
Result of the operation is: 1
pool-1-thread-3 processing record for : tom@gmail.com,Tom Hanks,1008
Result of the operation is: 1
pool-1-thread-4 processing record for : tom2@gmail.com,Tom Cruise,1009
Result of the operation is: 1
pool-1-thread-5 processing record for : christian@gmail.com,Christian Bale,1010
Result of the operation is: 1
Main execution over!

--

--

Prateek
Prateek

Written by Prateek

Java Developer and enthusiast

No responses yet