Compute API

Trueno Elastic Search Spark Connector

Trueno's ElasticSearch Connector for Apache Spark.

Requirements

Elasticsearch 2.x cluster accessible through [transport].

Architecture

Installation

Available through any Maven-compatible tool:

<dependency>
  <groupId>org.trueno.elasticsearch.spark.connector</groupId>
  <artifactId>elasticsearch-spark-connector</artifactId>
  <version>0.0.1</version>
</dependency>

Using Trueno's Elastic Search Spark Connector

spark-2.1.0-bin-hadoop2.7$ ./bin/spark-shell --jars elasticsearch-spark-connector.jar

Loading library from scala/spark

Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.trueno.elasticsearch.spark.connector._
import org.trueno.elasticsearch.spark.connector._

scala> val transportClient = new ESTransportClient(index, sc)
transportClient: org.trueno.elasticsearch.spark.connector.ESTransportClient = org.trueno.elasticsearch.spark.connector.ESTransportClient@31b7112d

scala> val verticesRDD = transportClient.getVertexRDD()
Elasticsearch client retrieving vertices ...

API Example

/* Trueno ES Spark Connector */
import org.trueno.elasticsearch.spark.connector._

/* GraphX references */
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.VertexRDD
import org.apache.spark.rdd.RDD

val transportClient = new ESTransportClient("biogrid",sc)

val verticesRDD = transportClient.getVertexRDD()

val edgesRDD = transportClient.getEdgeRDD()

val graph = transportClient.getGraph()

val g2 = PageRank.runUntilConvergence(graph,0.001)

Running PageRank from the Spark-Shell

java
/**      
  * Source: PageRank.scala
  * Author: 
  * Description: Spark Job Connector using Transport API
  */
import org.trueno.elasticsearch.spark.connector._
import org.apache.spark.SparkContext    
import org.apache.spark.SparkContext._

/* GraphX references */
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.VertexRDD
import org.apache.spark.graphx.EdgeRDD
import org.apache.spark.rdd.RDD

import org.apache.spark.SparkConf

import java.util.{Map => JMap}

import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
import scala.collection.immutable.HashMap

import org.apache.spark.api.java.JavaPairRDD
import org.apache.spark.api.java.JavaPairRDD.fromRDD
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.JavaSparkContext


case class TruenoVertex(id: Long, label: Long)

/* Instantiate transport Client from ES */
val tc = new ESTransportClient("biogrid", sc)

/* Read vertices from ES */
val verticesESJavaRDD = tc.getVertexRDD()

/* Converting JavaRDD to RDD */
val verticesESRDD = verticesESJavaRDD.rdd

/* Generating GraphX VertexRDD */
val vertexRDD: RDD[(VertexId,Long)] = verticesESRDD.map ( x => (x,x) )

/* Read edges from ES and converting to RDD */
val scalaESSRDD = tc.getEdgeRDD().rdd

/* Generating GraphX EdgeRDD */
val edgesRDD: RDD[Edge[Long]] = scalaESSRDD.flatMap( x=> ( x.map (y => ( Edge(y._1, y._2, 1.toLong) ) ) ) )

/* Creating Graph */
val graph = Graph(vertexRDD, edgesRDD)

/* Running PageRank on the graph */
val g2 = PageRank.runUntilConvergence(graph,0.001)

results matching ""

    No results matching ""