写了个简单的java分布式离线计算框架

最近复习Java多线程,对分布式计算突然有了浓厚的兴趣。顺手就写了个迷你小框架。

tinympi4j-master

https://github.com/binaryer/tinympi4j-master

a micro java offline distributed computation framework for fun, DO NOT use in production environment !
微型java分布式离线计算框架

原理

tinympi4j-master创建任务并提交到tinympi4j-slave执行, 执行完毕后把结果汇总到tinympi4j-master
tinympi4j-slave可动态加载执行class文件,如需增加新功能,只需在tinympi4j-master端新增任务类,而无需修改tinympi4j-slave端代码

特性

  • 简单直观, 没有任何学习难度
  • slave支持多个任务并发/并行执行
  • 使用HTTP协议通信
  • 场景: 找素数/grep/wordcount/超大文件或大量小文件处理
  • 不支持复杂数据类型
  • 没有进度监控,健康监控,无容错功能

使用流程

  1. 在多个计算节点启动 tinympi4j-slave
    java -jar tinympi4j-slave-0.1.jar {port}

  2. (在tinympi4j-master端) 编写任务类, 实现SplitableTask接口

  3. (在tinympi4j-master端) 参考下面代码,把任务提交到计算节点执行

  4. (在tinympi4j-master端) 等待所有计算节点执行完毕,获取结果

例子

分布式计算10000以内的素数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) {
//启动master上的tomcat
final int masterport = 8086;
final String masterurl = "http://192.168.1.100:" + masterport;
TomcatTool.startMasterTomcat(masterport);
//创建任务
final BigTask<Integer> bigtask = BigTask.create(masterurl);
//添加任务到两台计算节点, 请确保计算节点上的 tinympi4j-slave 已启动
//关于计算节点: https://github.com/binaryer/tinympi4j-slave
bigtask.addTask2Slave("http://192.168.1.101:1234", PrimeSplitedtask.class, new Integer[] { 2, 5000 });
bigtask.addTask2Slave("http://192.168.1.102:1234", PrimeSplitedtask.class, new Integer[] { 5001, 10000 });
//等待所有节点执行完毕
final Collection<Integer> resultset = bigtask.executeAndWait();
//打印结果
for (int n : resultset){
//System.out.println(n);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//创建SplitableTask的实现类
public class PrimeSplitedtask implements SplitableTask {
@Override
public Serializable execute(Serializable[] params) {
final int fromnumber = (Integer) params[0];
final int tonumber = (Integer) params[1];
final Set<Integer> resultset = new LinkedHashSet<Integer>();
for (int i = fromnumber; i <= tonumber; i++) {
if (isprime(i))
resultset.add(i);
}
return (Serializable) resultset;
}
//判断是否为素数
private boolean isprime(int number) {
int n = 2;
while (true) {
if (number % n == 0 && number!=n)
return false;
n++;
if (n > Math.sqrt(number))
return true;
}
}
}

后续完善

  • 子任务进度查询
  • slave端更多的设置选项: 如线程池大小
  • 单个子任务完成异步回调
  • 总任务完成异步回调
  • 暂停/继续/取消执行中的任务
  • 支持所有数据类型
  • 支持压缩传输
  • 支持未完成的任务回传已完成结果
  • 支持子节点故障转移