Java多线程--5 Master-worker模式并行模式



Master将很多任务分配给多个worker执行,worker将执行的结果返回给Master,master将所有结果统一返回出去。


import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Master {

	// 1 任务容器,多个worker执行,所以要线程安全的
	private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();

	// 2 worker的集合, master启动所有worker,不存在多线程
	private List<Thread> workers = new ArrayList<Thread>();

	// 3 worker执行任务的结果集合,多个worker执行,所以要线程安全的
	private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();

	// 4 构造方法
	public Master(Worker worker, int workerCount) {
		worker.setWorkQueue(this.workQueue);
		worker.setResultMap(this.resultMap);

		for (int i = 0; i < workerCount; i++) {
			this.workers.add(new Thread(worker,"Worker-"+i));
		}
	}

	// 5 需要一个提交任务的方法
	public void addTask(Task task) {
		this.workQueue.add(task);
	}

	// 6 需要有一个执行的方法,启动所有的worker方法去执行任务
	public void execute() {
		for (Thread th : workers) {
			th.start();
		}
	}

	// 7 判断是否运行结束的方法
	public boolean isComplete() {
		for (Thread th : workers) { //所有线程状态为执行完成, The thread has completed execution.
			if (th.getState() != Thread.State.TERMINATED) {
				return false;
			}
		}
		return true;
	}

	// 8 将所有结果返回,根据业务自己处理
	public Map<String, Object> getResult() {
		return resultMap;
	}

}


import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public abstract class Worker implements Runnable {

	private ConcurrentLinkedQueue<Task> workQueue;
	private ConcurrentHashMap<String, Object> resultMap;

	public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
		this.workQueue = workQueue;
	}

	public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
		this.resultMap = resultMap;
	}

	@Override
	public void run() {
		while (true) {
			Task task = this.workQueue.poll();
			if (task == null)
				break;
			Object result = handle(task);
			this.resultMap.put(task.getId(), result);
		}
	}

	public abstract Object handle(Task task)  ;

}


public abstract class Task {
	
	private String id;

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

}



public class NumWorker extends Worker{

	@Override
	public Object handle(Task task) {
		NumTask numTask = null ;
		if(task instanceof NumTask) {
			numTask = (NumTask)task ;
		}
		try {
			Thread.sleep(500);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		return numTask.getNum() * 10;
	}

}


public class NumTask extends Task{

	private int num ;

	public int getNum() {
		return num;
	}

	public void setNum(int num) {
		this.num = num;
	}
	
	
}



public class Demo {

	public static void main(String[] args) {
		
		long start = System.currentTimeMillis();
		
		Master master = new Master(new NumWorker(), 200);
		Random r = new Random();
		for (int i = 1; i <= 100; i++) {
			NumTask t = new NumTask();
			t.setId(""+i);
			t.setNum(r.nextInt(1000));//
			master.addTask(t);
		}
		master.execute();

		int sum = 0 ;
		while (true) {
			if (master.isComplete()) {
				long end = System.currentTimeMillis() - start;
				Map<String, Object> result = master.getResult();
				for(Entry<String, Object> entry : result.entrySet()) {
					sum += (int)entry.getValue() ;
				}
				
				System.out.println("最终结果:" + sum + ", 执行时间:" + end);
				break;
			}
		}
	}

}






多线程 master-worker模式

2020.11.18 20:47

https://www.meihaocloud.com.com/247.html , 欢迎转载,请在文章页标出原文连接 !


Copyright © 2020 千夕网 联系站长

粤公网安备 44030302001408号 粤ICP备19099833号-1