Compute API
Trueno Elastic Search Spark Connector
Trueno's ElasticSearch Connector for Apache Spark.
Requirements
Elasticsearch 2.x cluster accessible through [transport].
Architecture
Installation
Available through any Maven-compatible tool:
<dependency>
<groupId>org.trueno.elasticsearch.spark.connector</groupId>
<artifactId>elasticsearch-spark-connector</artifactId>
<version>0.0.1</version>
</dependency>
Using Trueno's Elastic Search Spark Connector
spark-2.1.0-bin-hadoop2.7$ ./bin/spark-shell --jars elasticsearch-spark-connector.jar
Loading library from scala/spark
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.trueno.elasticsearch.spark.connector._
import org.trueno.elasticsearch.spark.connector._
scala> val transportClient = new ESTransportClient(index, sc)
transportClient: org.trueno.elasticsearch.spark.connector.ESTransportClient = org.trueno.elasticsearch.spark.connector.ESTransportClient@31b7112d
scala> val verticesRDD = transportClient.getVertexRDD()
Elasticsearch client retrieving vertices ...
API Example
/* Trueno ES Spark Connector */
import org.trueno.elasticsearch.spark.connector._
/* GraphX references */
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.VertexRDD
import org.apache.spark.rdd.RDD
val transportClient = new ESTransportClient("biogrid",sc)
val verticesRDD = transportClient.getVertexRDD()
val edgesRDD = transportClient.getEdgeRDD()
val graph = transportClient.getGraph()
val g2 = PageRank.runUntilConvergence(graph,0.001)
Running PageRank from the Spark-Shell
java
/**
* Source: PageRank.scala
* Author:
* Description: Spark Job Connector using Transport API
*/
import org.trueno.elasticsearch.spark.connector._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
/* GraphX references */
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.VertexRDD
import org.apache.spark.graphx.EdgeRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import java.util.{Map => JMap}
import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
import scala.collection.immutable.HashMap
import org.apache.spark.api.java.JavaPairRDD
import org.apache.spark.api.java.JavaPairRDD.fromRDD
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.JavaSparkContext
case class TruenoVertex(id: Long, label: Long)
/* Instantiate transport Client from ES */
val tc = new ESTransportClient("biogrid", sc)
/* Read vertices from ES */
val verticesESJavaRDD = tc.getVertexRDD()
/* Converting JavaRDD to RDD */
val verticesESRDD = verticesESJavaRDD.rdd
/* Generating GraphX VertexRDD */
val vertexRDD: RDD[(VertexId,Long)] = verticesESRDD.map ( x => (x,x) )
/* Read edges from ES and converting to RDD */
val scalaESSRDD = tc.getEdgeRDD().rdd
/* Generating GraphX EdgeRDD */
val edgesRDD: RDD[Edge[Long]] = scalaESSRDD.flatMap( x=> ( x.map (y => ( Edge(y._1, y._2, 1.toLong) ) ) ) )
/* Creating Graph */
val graph = Graph(vertexRDD, edgesRDD)
/* Running PageRank on the graph */
val g2 = PageRank.runUntilConvergence(graph,0.001)