Master将很多任务分配给多个worker执行,worker将执行的结果返回给Master,master将所有结果统一返回出去。
import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Master { // 1 任务容器,多个worker执行,所以要线程安全的 private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); // 2 worker的集合, master启动所有worker,不存在多线程 private List<Thread> workers = new ArrayList<Thread>(); // 3 worker执行任务的结果集合,多个worker执行,所以要线程安全的 private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); // 4 构造方法 public Master(Worker worker, int workerCount) { worker.setWorkQueue(this.workQueue); worker.setResultMap(this.resultMap); for (int i = 0; i < workerCount; i++) { this.workers.add(new Thread(worker,"Worker-"+i)); } } // 5 需要一个提交任务的方法 public void addTask(Task task) { this.workQueue.add(task); } // 6 需要有一个执行的方法,启动所有的worker方法去执行任务 public void execute() { for (Thread th : workers) { th.start(); } } // 7 判断是否运行结束的方法 public boolean isComplete() { for (Thread th : workers) { //所有线程状态为执行完成, The thread has completed execution. if (th.getState() != Thread.State.TERMINATED) { return false; } } return true; } // 8 将所有结果返回,根据业务自己处理 public Map<String, Object> getResult() { return resultMap; } }
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public abstract class Worker implements Runnable { private ConcurrentLinkedQueue<Task> workQueue; private ConcurrentHashMap<String, Object> resultMap; public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) { this.workQueue = workQueue; } public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { this.resultMap = resultMap; } @Override public void run() { while (true) { Task task = this.workQueue.poll(); if (task == null) break; Object result = handle(task); this.resultMap.put(task.getId(), result); } } public abstract Object handle(Task task) ; }
public abstract class Task { private String id; public String getId() { return id; } public void setId(String id) { this.id = id; } }
public class NumWorker extends Worker{ @Override public Object handle(Task task) { NumTask numTask = null ; if(task instanceof NumTask) { numTask = (NumTask)task ; } try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return numTask.getNum() * 10; } }
public class NumTask extends Task{ private int num ; public int getNum() { return num; } public void setNum(int num) { this.num = num; } }
public class Demo { public static void main(String[] args) { long start = System.currentTimeMillis(); Master master = new Master(new NumWorker(), 200); Random r = new Random(); for (int i = 1; i <= 100; i++) { NumTask t = new NumTask(); t.setId(""+i); t.setNum(r.nextInt(1000));// master.addTask(t); } master.execute(); int sum = 0 ; while (true) { if (master.isComplete()) { long end = System.currentTimeMillis() - start; Map<String, Object> result = master.getResult(); for(Entry<String, Object> entry : result.entrySet()) { sum += (int)entry.getValue() ; } System.out.println("最终结果:" + sum + ", 执行时间:" + end); break; } } } }