Real-time Image Processing using Apache Flink

In this post I am going to explain how to create a (fast and dirty) real-time image processing streaming using Apache Flink (the same thing can be done easily using Apache Spark).

I am going to focus first on convolutions for both matrices and functions. The idea is as follows:

  • Images are read from Apache Kakfa with the following format:

imageName;frame;x;y;value

  • Images are processed using Apache Flink
  • Processed imaged are streamed again using Apache Kakfa

First of all, let’s create a case class for Pixels

case class Pixel(name: String,frame: Int,x: Int,y: Int,v: Double) {
lazy val keyFrameXY = name + "-" + frame + "-" + x + "-" + y}

object Pixel {
def apply(s: String): Pixel = {
val stringList = s.split(';')
Pixel(stringList(0),stringList(1).toInt,
stringList(2).toInt,stringList(3).toInt,
stringList(4).toDouble)}}

In order to implement convolution we are going to use the flatMapReduce method introduced in my previous post. A simple implementation looks like this:

def convolutionMatrix(m: DenseMatrix[Double])(in: DataStream[Pixel])
: DataStream[Pixel] = {
require(m.rows % 2 == 1 & m.cols % 2 == 1)
val area = m.cols*m.rows
def expand(p: Pixel): TraversableOnce[Pixel] = ???
class reduce
extends WindowFunction[Pixel, Pixel,String, GlobalWindow] {
override
def apply(key: String,window: GlobalWindow,input: Iterable[Pixel],
out: Collector[Pixel]):Unit = ??? }
in.flatMap( x => expand(x) )
.keyBy(_.keyFrameXY)
.countWindow(area)
.apply(new reduce )
}

We need to implement the expand and reduce function. The expand function creates for the incoming pixel one copy for each pixel in the  neighborhood with the corresponding value. You can write something like:

def expand(p: Pixel): TraversableOnce[Pixel] = {
val sizeX = (m.rows - 1) / 2
val sizeY = (m.cols - 1) / 2
for {
x y } yield Pixel(p.name,p.frame,(p.x - x + pointsX)%pointsX, (p.y - y + pointsY)%pointsY,p.v * m(sizeX + x,sizeY + y))}

The reduce function is very simple, just summing all the values with the same pixel coordinates

class reduce
extends WindowFunction[Pixel, Pixel,String, GlobalWindow] {
override
def apply(key: String,
window: GlobalWindow,
input: Iterable[Pixel],
out: Collector[Pixel]):Unit =
out.collect(
Pixel(input.head.name,
input.head.frame,
input.head.x,input.head.y,
input.map(_.v).sum))}

It is trivial changing matrix convolution by function convolution, you can write:

def convolutionFunction
(f: (Int,Int) => Double,sizeX: Int,sizeY: Int)
(in:DataStream[Pixel]): DataStream[Pixel] = {

val area = (2*sizeX+1)*(2*sizeY + 1)
// Expand function
def expand(p: Pixel): TraversableOnce[Pixel] = {
for {
x <- -sizeX to sizeX
y <- -sizeY to sizeY
} yield Pixel(p.name,p.frame,(p.x - x + pointsX)%pointsX, (p.y - y + pointsY)%pointsY,p.v * f(x,y))
}

class reduce
  extends WindowFunction[Pixel, Pixel,String, GlobalWindow] {

  override def apply(key: String, window: GlobalWindow,
                     input: Iterable[Pixel],
                     out: Collector[Pixel]):Unit = {

 out.collect(
Pixel(input.head.name,
      input.head.frame,
      input.head.x,
      input.head.y,
      input.map(_.v).sum))
  }
}

in.flatMap( x => expand(x) )
  .keyBy(_.keyFrameXY)
  .countWindow(area)
  .apply(new reduce )
}

Is it very easy, isn’t it? No we can create the main function:

def main(args: Array[String]): Unit = {
// Define some kernels
// Gaussian
def Gaussian(s: Double)(x: Int,y: Int): Double =
  1.0 / 2 / math.Pi / s/s * math.exp( - ( x*x + +y*y)/2.0/s/s)
// Laplacian of Gaussian
def LoG(s: Double)(x: Int,y: Int): Double =
  1.0 / math.Pi / math.pow(s,4) *
  (1.0 - (x*x + y*y)/2.0/s/s) * math.exp( - ( x*x + +y*y)/2.0)
// Blur
val blur = DenseMatrix.create(3,3,
 Array(0.0,0.2,0.0,
       0.2,0.2,0.2,
       0.0,0.2,0.0))
// Start streaming enviroment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Read data from Kafka
val rawStream = env
   .addSource(new FlinkKafkaConsumer09[String]("imageIn",
                       ew SimpleStringSchema(),properties))
// From String to Pixels
val pixelStream: DataStream[Pixel] = rawStream.map(Pixel(_) )
// Matrix Convolution
val pixelStreamMatrix = convolutionMatrix(matrixBlur)(pixelStream)
// Function Convolution
val LoG14: (Int,Int) => Double = LoG(1.4)
val pixelStreamFunction = convolutionFunction(LoG14 ,5,5)(pixelStream)

//New data to Kafka
pixelStreamMatrix.addSink(new FlinkKafkaProducer09[String] ("localhost:9092","imageMatrix",newSimpleStringSchema))
pixelStreamFunction,addSink(...)

env.execute("Image Processing using Apache Flink")
}

Big Data beyond Machine Learning: a heating example

Open source big data technologies (BDT) like  Apache Spark or Apache Flink are being widely used to distributed processing of huge amount of data (mostly counting words ;)).

On the other side, complex data analysis can be “easily” perform using open source libraries (i.e. machine learning frameworks like scikit-learn, caret) in languages like  Python or R. These libraries are usually based on optimized libraries written in low level languages like C or Fortran but providing a more friendly coding style.

Unfortunately, combining both and doing distributed complex data analysis à la big data is a topic not as mature as one would like. In my opinion, an Aquilles’ heel about BDT is the absence of optimized distributed linear algebra libraries like in High Performance Computing (HPC) (i.e. PBLAS, ScaLAPACK, PARPACK). But that is another story and shall be told another time …

In this post, I would like to explore the possibilities that  big data technologies (will) provide to perform distributed numerical simulations. Numerical simulations is a broad field, so let’s consider a concrete example, the heat equation:

\frac{\partial T}{\partial t} = \alpha\nabla^2 T + \frac{1}{c_p\rho}q

This equation is deeply connected with many interesting problems in physics, chemistry or financial mathematics, so I consider it may be an interesting starting point.

Considering a naïve approach, let me use the explicit form to integrate the heat equation. After a bit of algebra it’s obtained:

T^{n+1}_{i,j,k} = (1 - 6 r)T^{n}_{i,j,k} + r'q_{i,j,k} +  r \sum_{m=\pm 1}(T^{n}_{i+m,j,k}+T^{n}_{i,j+m,k}+T^{n}_{i,j,k+m}) 

where r =  \frac{\alpha a_t}{a_x^2} and r' = \frac{a_t}{c_p\rho}.

Leaving aside stability and precision problems, we would like to integrate this equation in a parallel distributed way. If you have been in touch with HPC and MPI, a straightforward strategy could be:

  1. splitting the lattice in blocks (points and boundary term)
  2. assign each block to a processor
  3. integrate the points
  4. update the boundary
  5. iterate.

Unfortunately, using BDT we cannot do that. In this case, data is going to be distributed  randomly through the nodes. In principle, you don’t know who has each data either its neighbours. So, we have to think about a parallel algorithm under this assumption. What about this?:

  1. Consider a configuration as pairs: (coordinate, values).
  2. For each pair, create copies with coordinates corresponding to its neighbours.
  3. Group all the pairs with the same coordinates.
  4. For each group, operate to obtain the updated value.
  5. iterate.

Maybe this sounds a bit weird if you are coming from HPC, but in terms of the functional programming paradigm used by BDT seems to be quite a natural approach. In addition, there is an extra bonus, coding these functions in BDT is extremely simple.

Let’s see how to do this using scala. First of all, we define a general function for a single iteration:

// Create three case class to organise the the data
case class Coor(x: Int, y: Int, z: Int)
case class Value(T: Double, Q: Double)
case class Point(coor: Coor, value: Value)

// Define a function to do a single iteration
def update(field: RDD[Point],
expand: Point => List[Point],
reduce: (coor, List[Point])]): RDD[Point] =
field
.flatMap( expand(_))
.groupBy(_.coor)
.map( case (coor,values) => reduce(coor,values.toList) )

What is this function doing?

Let’s understand the signature first: the function receives a set of Point (field), a function (expand) that receives a Point and return a set of Point and a function (reduce) that receives a Coor and a set of Point and returns a Point, finally update returns a set of Point.

The body of the function is very succinct, and it can be related directly with step 2-4 defined previously.

This function is quite general, so one can use it to solve other differential equations, Monte Carlo simulations, … just defining the proper expand and reduce functions ( yeah … sometimes functional programming is amazing).

In this case, we are considering the heat equation,  so let’s give an example of expand and reduce functions. The expand function receives a Point and returns a list of Points with the coordinates corresponding to its neighbours and values according to Eq. (2). In this case the neighbours are first neighbours in each direction, then we can write:

def expand(p: Point): List[Point] = {
val Point(Coor(x,y,z),Value(T,Q)) = p
List (
Point(Coor(x,y,z), Value(c1*T + c3*Q,Q)))),
Point(Coor(x + 1,y,z), Value(c2*T,0.0)),
Point(Coor(x,y + 1,z), Value(c2*T,0.0)),
Point(Coor(x,y,z + 1), Value(c2*T,0.0)),
Point(Coor(x - 1,y,z), Value(c2*T,0.0)),
Point(Coor(x,y - 1,z), Value(c2*T,0.0)),
Point(Coor(x,y,z - 1), Value(c2*T,0.0))
) .filter(!isOutBox(_.coor))
}

Since we prepare the data in the expand function, the reduce function is quite simple, we can just sum the values:

def reduce(coor: Coor, values: List[Point]): Field =
if (isBoundary(coor)) Point(coor,Boundary(coor))
else Point(coor, values
.map{case Field(p, f) => f}
.foldLeft(Value(0.0,0.0)){
(R, e) => Value(R.T + e.T,R.Q + e.Q)
}

That’s all, clean and simple …

In these links (heatbath_Spark, heatbath_Flink) you can find a complete implementation for both technologies. As you can see they are the same at ~90%.

I don’t want to blur this post with efficiency comparatives between both Spark and Flink, because thas is an interesting topic itself and I would like to treat it in a future post.