-
Notifications
You must be signed in to change notification settings - Fork 39
Expand file tree
/
Copy pathActorClosureFactory.java
More file actions
70 lines (58 loc) · 2.61 KB
/
ActorClosureFactory.java
File metadata and controls
70 lines (58 loc) · 2.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
public class ActorClosureFactory {
private static Class<?>[] convertParamTypes(Object... args) {
Class<?>[] types = new Class<?>[args.length];
for (int i = 0; i < args.length; i++) {
types[i] = args[i].getClass();
}
return types;
}
private static <T> T instantiate(Class<T> clazz, Object... args) {
Class<?>[] paramTypes = convertParamTypes(args);
try {
return clazz
.getConstructor(paramTypes)
.newInstance(args);
} catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException("Could not instantiate " + clazz, e);
}
}
public static <T> Actor createActor(Class<T> clazz, Object... args) {
final AtomicBoolean isProcessing = new AtomicBoolean(false);
final Queue<Operation> queue = new ConcurrentLinkedQueue<>();
final T state = instantiate(clazz, args);
return new Actor() {
@Override
public void process() {
new Thread(() -> {
if (!isProcessing.compareAndSet(false, true))
return;
while (!queue.isEmpty()) {
Operation operation = queue.poll();
CompletableFuture<Object> resolve = operation.resolve();
try {
Method method = state.getClass().getMethod(operation.method(), convertParamTypes(operation.args()));
Object result = method.invoke(state, operation.args());
resolve.complete(result);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
resolve.completeExceptionally(e);
}
}
isProcessing.set(false);
}).start();
}
@Override
public CompletableFuture<Object> send(String method, Object... args) {
CompletableFuture<Object> resultReference = new CompletableFuture<>();
queue.add(new Operation(method, args, resultReference));
this.process();
return resultReference;
}
};
}
}