Quick Answer
Multithreading allows parallel execution of code. Use Thread class, Runnable interface, or ExecutorService for concurrent programming with proper synchronization.
Understanding the Issue
The Problem
This code demonstrates the issue:
// Problem 1: Race condition without synchronization
public class Counter {
private int count = 0;
public void increment() {
count++; // Not thread-safe - race condition possible
}
public int getCount() {
return count; // May return inconsistent values
}
}
// Problem 2: Inefficient thread creation and management
public void processItems(List<String> items) {
for (String item : items) {
new Thread(() -> processItem(item)).start(); // Creates too many threads
}
}
The Solution
Here's the corrected code:
// Solution 1: Basic thread creation and execution
import java.util.concurrent.*;
// Method 1: Extending Thread class
class WorkerThread extends Thread {
private String taskName;
public WorkerThread(String taskName) {
this.taskName = taskName;
}
@Override
public void run() {
System.out.println("Starting task: " + taskName + " on thread: " + Thread.currentThread().getName());
try {
// Simulate work
Thread.sleep(2000);
System.out.println("Completed task: " + taskName);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Task interrupted: " + taskName);
}
}
}
// Method 2: Implementing Runnable interface (preferred)
class TaskRunner implements Runnable {
private String taskName;
public TaskRunner(String taskName) {
this.taskName = taskName;
}
@Override
public void run() {
System.out.println("Executing: " + taskName + " on " + Thread.currentThread().getName());
// Simulate processing
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
System.out.println("Finished: " + taskName);
}
}
// Basic thread usage
public class BasicThreadExample {
public static void main(String[] args) {
// Using Thread subclass
WorkerThread worker1 = new WorkerThread("Database Update");
WorkerThread worker2 = new WorkerThread("File Processing");
worker1.start();
worker2.start();
// Using Runnable with Thread
Thread thread3 = new Thread(new TaskRunner("Email Sending"));
Thread thread4 = new Thread(new TaskRunner("Report Generation"));
thread3.start();
thread4.start();
// Using lambda expressions (Java 8+)
Thread lambdaThread = new Thread(() -> {
System.out.println("Lambda task running on: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
lambdaThread.start();
// Wait for threads to complete
try {
worker1.join();
worker2.join();
thread3.join();
thread4.join();
lambdaThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("All tasks completed");
}
}
// Solution 2: Thread synchronization and safety
// Thread-safe counter using synchronized
public class ThreadSafeCounter {
private int count = 0;
private final Object lock = new Object();
// Synchronized method
public synchronized void increment() {
count++;
}
// Synchronized block
public void incrementWithBlock() {
synchronized (lock) {
count++;
}
}
public synchronized int getCount() {
return count;
}
// Atomic alternative
private final AtomicInteger atomicCount = new AtomicInteger(0);
public void atomicIncrement() {
atomicCount.incrementAndGet();
}
public int getAtomicCount() {
return atomicCount.get();
}
}
// Producer-Consumer pattern with wait/notify
public class ProducerConsumerExample {
private final Queue<Integer> buffer = new LinkedList<>();
private final int capacity = 5;
private final Object lock = new Object();
// Producer thread
public void produce() throws InterruptedException {
int value = 0;
while (true) {
synchronized (lock) {
while (buffer.size() == capacity) {
lock.wait(); // Wait until space available
}
buffer.offer(++value);
System.out.println("Produced: " + value);
lock.notifyAll(); // Notify waiting consumers
}
Thread.sleep(500); // Simulate production time
}
}
// Consumer thread
public void consume() throws InterruptedException {
while (true) {
synchronized (lock) {
while (buffer.isEmpty()) {
lock.wait(); // Wait until item available
}
int value = buffer.poll();
System.out.println("Consumed: " + value);
lock.notifyAll(); // Notify waiting producers
}
Thread.sleep(1000); // Simulate consumption time
}
}
}
// Solution 3: Modern concurrency with ExecutorService
public class ModernConcurrencyExample {
// Thread pool management
public void demonstrateExecutorService() {
// Fixed thread pool
ExecutorService fixedPool = Executors.newFixedThreadPool(4);
// Cached thread pool (creates threads as needed)
ExecutorService cachedPool = Executors.newCachedThreadPool();
// Single thread executor
ExecutorService singleThread = Executors.newSingleThreadExecutor();
// Scheduled thread pool
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
// Submit tasks to fixed pool
for (int i = 0; i < 10; i++) {
final int taskId = i;
fixedPool.submit(() -> {
System.out.println("Task " + taskId + " running on: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Task " + taskId + " completed";
});
}
// Submit callable tasks (return results)
Future<String> future = fixedPool.submit(() -> {
Thread.sleep(1000);
return "Callable result";
});
try {
String result = future.get(5, TimeUnit.SECONDS); // Wait max 5 seconds
System.out.println("Future result: " + result);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
System.err.println("Future execution failed: " + e.getMessage());
}
// Schedule tasks
scheduledPool.schedule(() -> System.out.println("Delayed task"), 3, TimeUnit.SECONDS);
scheduledPool.scheduleAtFixedRate(() ->
System.out.println("Periodic task at: " + System.currentTimeMillis()),
1, 2, TimeUnit.SECONDS
);
// Proper shutdown
fixedPool.shutdown();
try {
if (!fixedPool.awaitTermination(60, TimeUnit.SECONDS)) {
fixedPool.shutdownNow();
}
} catch (InterruptedException e) {
fixedPool.shutdownNow();
}
}
// CompletableFuture for asynchronous programming
public void demonstrateCompletableFuture() {
// Simple async task
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Hello";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "World";
});
// Combine futures
CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
combined.thenAccept(result -> System.out.println("Combined result: " + result));
// Chain operations
CompletableFuture<String> chained = CompletableFuture
.supplyAsync(() -> "initial")
.thenApply(String::toUpperCase)
.thenApply(s -> s + " PROCESSED")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " COMPOSED"));
chained.thenAccept(System.out::println);
// Handle exceptions
CompletableFuture<String> withException = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Random failure");
}
return "Success";
})
.handle((result, exception) -> {
if (exception != null) {
return "Handled error: " + exception.getMessage();
}
return result;
});
withException.thenAccept(System.out::println);
}
// Concurrent collections
public void demonstrateConcurrentCollections() {
// Thread-safe map
ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();
// Multiple threads updating map
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 100; i++) {
final int value = i;
executor.submit(() -> {
concurrentMap.put("key" + value, value);
concurrentMap.compute("sum", (k, v) -> (v == null) ? value : v + value);
});
}
// Thread-safe queue
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// Producer
executor.submit(() -> {
try {
for (int i = 0; i < 20; i++) {
queue.put("Item " + i); // Blocks if queue is full
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer
executor.submit(() -> {
try {
while (true) {
String item = queue.take(); // Blocks if queue is empty
System.out.println("Processing: " + item);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
executor.shutdown();
}
// Thread coordination with CountDownLatch and CyclicBarrier
public void demonstrateCoordination() throws InterruptedException {
int numberOfTasks = 5;
CountDownLatch latch = new CountDownLatch(numberOfTasks);
CyclicBarrier barrier = new CyclicBarrier(numberOfTasks, () ->
System.out.println("All tasks reached barrier")
);
ExecutorService executor = Executors.newFixedThreadPool(numberOfTasks);
for (int i = 0; i < numberOfTasks; i++) {
final int taskId = i;
executor.submit(() -> {
try {
// Phase 1
System.out.println("Task " + taskId + " starting phase 1");
Thread.sleep(1000 + taskId * 100);
System.out.println("Task " + taskId + " finished phase 1");
// Wait for all tasks to finish phase 1
barrier.await();
// Phase 2
System.out.println("Task " + taskId + " starting phase 2");
Thread.sleep(500);
System.out.println("Task " + taskId + " finished phase 2");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});
}
// Wait for all tasks to complete
latch.await();
System.out.println("All tasks completed");
executor.shutdown();
}
}
// Real-world example: Parallel file processing
public class ParallelFileProcessor {
private final ExecutorService executor;
private final int threadCount;
public ParallelFileProcessor(int threadCount) {
this.threadCount = threadCount;
this.executor = Executors.newFixedThreadPool(threadCount);
}
public void processFiles(List<String> filePaths) {
List<CompletableFuture<String>> futures = filePaths.stream()
.map(this::processFileAsync)
.collect(Collectors.toList());
// Wait for all files to be processed
CompletableFuture<Void> allOf = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
allOf.thenRun(() -> {
System.out.println("All files processed successfully");
futures.forEach(future -> {
try {
System.out.println("Result: " + future.get());
} catch (InterruptedException | ExecutionException e) {
System.err.println("Error getting result: " + e.getMessage());
}
});
});
}
private CompletableFuture<String> processFileAsync(String filePath) {
return CompletableFuture.supplyAsync(() -> {
try {
// Simulate file processing
Thread.sleep(1000 + (int)(Math.random() * 2000));
return "Processed: " + filePath;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Processing interrupted", e);
}
}, executor);
}
public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
}
Key Takeaways
Use ExecutorService instead of creating threads manually for better resource management. Implement proper synchronization with synchronized blocks, atomic classes, or concurrent collections. Leverage CompletableFuture for asynchronous programming. Always handle InterruptedException properly and shutdown thread pools gracefully. Use modern concurrency utilities for thread coordination.