I tried to develop a simple example which reads data from the flatfile using Java Executors.
public static ExecutorService newFixedThreadPool(int nThreads) — Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.
Java Queue Categories
In Java, we can find many Queue implementations. W can broadly categorize them into the following two types
- Bounded Queues
- Unbounded Queues
Bounded Queues — are queues which are bounded by capacity that means we need to provide the max size of the queue at the time of creation. For example ArrayBlockingQueue (see previous example).
Unbounded Queues — are queues which are NOT bounded by capacity that means we should not provide the size of the queue. For example LinkedList (see previous example).
All Queues which are available in java.util package are Unbounded Queues and Queues which are available in java.util.concurrent package are Bounded Queues.
In other ways, W can broadly categorize them into the following two types:
- Blocking Queues
- Non-Blocking Queues
All Queues which implement BlockingQueue interface are BlockingQueues and rest are Non-Blocking Queues.
BlockingQueues blocks until it finishes it’s job or time out, but Non-BlockingQueues do not.
Some Queues are Deques and some queues are PriorityQueues.
BlockingQueue Operations
In addition to Queue’s two forms of operations, BlockingQueue’s supports two more forms as shown below.
OperationThrows exceptionSpecial valueBlocksTimes outInsertadd(e)offer(e)put(e)offer(e, time, unit)Removeremove()poll()take()poll(time, unit)Examineelement()peek()N/AN/A
Some operations are blocked until it finishes it’s job and other are blocked until time out.
That’s all of a quick roundup on Queue in Java. I hope these Java Queue examples will help you in getting started with Queue collection programming.
User.java
public class User {
private int id;
private String name;
private String emailAddress;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getEmailAddress() {
return emailAddress;
}
public void setEmailAddress(String emailAddress) {
this.emailAddress = emailAddress;
}
}
DBConnection.java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.logging.Level;
import java.util.logging.Logger;
public class DBConnection { public static Connection getConnection() {
Connection connection = null;
try {
Class.forName("com.mysql.cj.jdbc.Driver");
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
return connection;
} catch (ClassNotFoundException ex) {
Logger.getLogger(DBConnection.class.getName()).log(Level.SEVERE, null, ex);
} catch (SQLException ex) {
Logger.getLogger(DBConnection.class.getName()).log(Level.SEVERE, null, ex);
}
return connection;
}
}
UserDao.java
import com.app.beans.User;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.logging.Level;
import java.util.logging.Logger;
public class UserDao {
public int saveUser(User user) {
int rows = 0;
try {
Connection connection = DBConnection.getConnection();
PreparedStatement statement = connection.prepareStatement("insert into user values(?,?,?)");
statement.setInt(1, user.getId());
statement.setString(2, user.getName());
statement.setString(3, user.getEmailAddress());
rows = statement.executeUpdate();
} catch (SQLException ex) {
Logger.getLogger(UserDao.class.getName()).log(Level.SEVERE, null, ex);
}
return rows;
}
}
UserProcessor.java
import com.app.beans.User;
import com.app.dao.UserDao;
import java.util.StringTokenizer;
import java.util.concurrent.Callable;
public class UserProcessor implements Callable<Integer> {
private String userRecord;
private UserDao dao;
public UserProcessor(String userRecord, UserDao dao) {
this.userRecord = userRecord;
this.dao = dao;
}
@Override
public Integer call() throws Exception {
int rows = 0;
System.out.println(Thread.currentThread().getName() + " processing record for : " + userRecord);
StringTokenizer tokenizer = new StringTokenizer(userRecord, ",");
User user = null;
while (tokenizer.hasMoreTokens()) {
user = new User();
user.setEmailAddress(tokenizer.nextToken());
user.setName(tokenizer.nextToken());
user.setId(Integer.valueOf(tokenizer.nextToken()));
rows = dao.saveUser(user);
}
return rows;
}
}
TestExecutors.java
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.app.dao.UserDao;
import com.app.runnables.UserProcessor;
public class TestExecutors {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(5);
List<String> users = getUsersFromFile("C:\\new_users.txt");
UserDao dao = new UserDao();
for (String user : users) {
Future<Integer> future = service.submit(new UserProcessor(user, dao));
try {
System.out.println("Result of the operation is: " + future.get());
} catch (InterruptedException ex) {
Logger.getLogger(TestExecutors.class.getName()).log(Level.SEVERE, null, ex);
} catch (ExecutionException ex) {
Logger.getLogger(TestExecutors.class.getName()).log(Level.SEVERE, null, ex);
}
}
service.shutdown();
System.out.println("Main execution over!");
}
public static List<String> getUsersFromFile(String fileName) {
List<String> users = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader(new File(fileName)))) {
String line = null;
while ((line = reader.readLine()) != null) {
users.add(line);
}
} catch (FileNotFoundException ex) {
Logger.getLogger(TestExecutors.class.getName()).log(Level.SEVERE, null, ex);
} catch (IOException ex) {
Logger.getLogger(TestExecutors.class.getName()).log(Level.SEVERE, null, ex);
}
return users;
}
}
db.sql
CREATE TABLE `user` (
`id` int(11) NOT NULL,
`name` varchar(255) DEFAULT NULL,
`emailAddress` varchar(255) DEFAULT NULL
)
new_user.txt
henry@gmail.com,Henry Cavell,1001
brad@gmail.com,Brad Pitt,1002
leo@gmail.com,Leonardo DiCaprio,1003
matt@gmail.com,Matt Damon,1004
george@gmail.com,George Clooney,1005
daniel@gmail.com,Daniel Radcliffe,1006
emma@gmail.com,Emma Watson,1007
tom@gmail.com,Tom Hanks,1008
tom2@gmail.com,Tom Cruise,1009
christian@gmail.com,Christian Bale,1010
Here is the output -
pool-1-thread-1 processing record for : henry@gmail.com,Henry Cavell,1001
Result of the operation is: 1
pool-1-thread-2 processing record for : brad@gmail.com,Brad Pitt,1002
Result of the operation is: 1
pool-1-thread-3 processing record for : leo@gmail.com,Leonardo DiCaprio,1003
Result of the operation is: 1
pool-1-thread-4 processing record for : matt@gmail.com,Matt Damon,1004
Result of the operation is: 1
pool-1-thread-5 processing record for : george@gmail.com,George Clooney,1005
Result of the operation is: 1
pool-1-thread-1 processing record for : daniel@gmail.com,Daniel Radcliffe,1006
Result of the operation is: 1
pool-1-thread-2 processing record for : emma@gmail.com,Emma Watson,1007
Result of the operation is: 1
pool-1-thread-3 processing record for : tom@gmail.com,Tom Hanks,1008
Result of the operation is: 1
pool-1-thread-4 processing record for : tom2@gmail.com,Tom Cruise,1009
Result of the operation is: 1
pool-1-thread-5 processing record for : christian@gmail.com,Christian Bale,1010
Result of the operation is: 1
Main execution over!