I began the day with a Skype call with Asterios and Stephan from the Flink team. I learned some more things about how to use and test the code I write for Flink, and several places where I did not quite understand the low-level Flink behavior. I have a long list of fixes and todos, but the code generator is looking pretty good! Check out the code below.
Andrew, Bill, Jake, Joe and I met met to discuss the Fall edition of the Data Science Incubator. We are really excited to run the program again, and we’re hoping it will be productive and fun!
For more information about the UW Data Science Incubator: http://data.uw.edu/incubator/
My Raco → Flink code generator automatically produced the following Flink-0.6 Java program from the 8-line MyriaL program embedded in the source below:
import org.apache.flink.api.common.functions.*;
import org.apache.flink.api.java.*;
import org.apache.flink.api.java.aggregation.*;
import org.apache.flink.api.java.tuple.*;
// Original query:
// emp = scan(public:adhoc:employee);
// emp1 = scan(public:adhoc:employee);
// j = [from emp, emp1
// where (emp1.$2 = "Magdalena Balazinska"
// or emp1.salary < 25000)
// and emp1.$0 = emp.$0
// emit emp1.*];
// store(j, OUTPUT);
public class FlinkQuery {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Scan(public:adhoc:employee)
DataSet<Tuple4<Long,Long,String,Long>> employee = load_employee(env);
// Apply(id=$0)[employee]
DataSet<Tuple1<Long>> Apply1 = employee.project(0).types(Long.class);
// Scan(public:adhoc:employee)
// skipped -- already computed
// Select((($2 = "Magdalena Balazinska") or (salary < 25000)))[employee]
DataSet<Tuple4<Long,Long,String,Long>> Select1 = employee.filter(new FilterFunction<Tuple4<Long,Long,String,Long>>() {
@Override
public boolean filter(Tuple4<Long,Long,String,Long> t) {
return ((t.f2).equals("Magdalena Balazinska")) || ((t.f3) < (25000L));
}
}).name("(($2 = \"Magdalena Balazinska\") or (salary < 25000))");
// ProjectingJoin(($1 = $0); $1, $2, $3, $4)[Apply1,Select1]
DataSet<Tuple4<Long,Long,String,Long>> ProjectingJoin1 = Apply1.joinWithHuge(Select1).where(0).equalTo(0).projectSecond(0,1,2,3).types(Long.class,Long.class,String.class,Long.class);
// Store(public:adhoc:OUTPUT)[ProjectingJoin1]
ProjectingJoin1.writeAsCsv("file:///tmp/flink/OUTPUT");
env.execute("MyriaL query");
}
private static DataSet<Tuple4<Long,Long,String,Long>> load_employee(ExecutionEnvironment env) {
return env.readCsvFile("file:///tmp/flink/employee").types(Long.class,Long.class,String.class,Long.class);
}
}
There are comments.