最近复习Java多线程,对分布式计算突然有了浓厚的兴趣。顺手就写了个迷你小框架。
tinympi4j-master
https://github.com/binaryer/tinympi4j-master
a micro java offline distributed computation framework for fun, DO NOT use in production environment !
微型java分布式离线计算框架
原理
tinympi4j-master
创建任务并提交到tinympi4j-slave
执行, 执行完毕后把结果汇总到tinympi4j-master
tinympi4j-slave
可动态加载执行class文件,如需增加新功能,只需在tinympi4j-master
端新增任务类,而无需修改tinympi4j-slave
端代码

特性
- 简单直观, 没有任何学习难度
- slave支持多个任务并发/并行执行
- 使用HTTP协议通信
- 场景: 找素数/grep/wordcount/超大文件或大量小文件处理
- 不支持复杂数据类型
- 没有进度监控,健康监控,无容错功能
使用流程
在多个计算节点启动 tinympi4j-slave
java -jar tinympi4j-slave-0.1.jar {port}
(在tinympi4j-master端) 编写任务类, 实现SplitableTask
接口
(在tinympi4j-master端) 参考下面代码,把任务提交到计算节点执行
(在tinympi4j-master端) 等待所有计算节点执行完毕,获取结果
例子
分布式计算10000以内的素数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public static void main(String[] args) { final int masterport = 8086; final String masterurl = "http://192.168.1.100:" + masterport; TomcatTool.startMasterTomcat(masterport); final BigTask<Integer> bigtask = BigTask.create(masterurl); bigtask.addTask2Slave("http://192.168.1.101:1234", PrimeSplitedtask.class, new Integer[] { 2, 5000 }); bigtask.addTask2Slave("http://192.168.1.102:1234", PrimeSplitedtask.class, new Integer[] { 5001, 10000 }); final Collection<Integer> resultset = bigtask.executeAndWait(); for (int n : resultset){ } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class PrimeSplitedtask implements SplitableTask { @Override public Serializable execute(Serializable[] params) { final int fromnumber = (Integer) params[0]; final int tonumber = (Integer) params[1]; final Set<Integer> resultset = new LinkedHashSet<Integer>(); for (int i = fromnumber; i <= tonumber; i++) { if (isprime(i)) resultset.add(i); } return (Serializable) resultset; } private boolean isprime(int number) { int n = 2; while (true) { if (number % n == 0 && number!=n) return false; n++; if (n > Math.sqrt(number)) return true; } } }
|
后续完善
- 子任务进度查询
- slave端更多的设置选项: 如线程池大小
- 单个子任务完成异步回调
- 总任务完成异步回调
- 暂停/继续/取消执行中的任务
- 支持所有数据类型
- 支持压缩传输
- 支持未完成的任务回传已完成结果
- 支持子节点故障转移