Real-time Image Processing using Apache Flink

In this post I am going to explain how to create a (fast and dirty) real-time image processing streaming using Apache Flink (the same thing can be done easily using Apache Spark).

I am going to focus first on convolutions for both matrices and functions. The idea is as follows:

  • Images are read from Apache Kakfa with the following format:

imageName;frame;x;y;value

  • Images are processed using Apache Flink
  • Processed imaged are streamed again using Apache Kakfa

First of all, let’s create a case class for Pixels

case class Pixel(name: String,frame: Int,x: Int,y: Int,v: Double) {
lazy val keyFrameXY = name + "-" + frame + "-" + x + "-" + y}

object Pixel {
def apply(s: String): Pixel = {
val stringList = s.split(';')
Pixel(stringList(0),stringList(1).toInt,
stringList(2).toInt,stringList(3).toInt,
stringList(4).toDouble)}}

In order to implement convolution we are going to use the flatMapReduce method introduced in my previous post. A simple implementation looks like this:

def convolutionMatrix(m: DenseMatrix[Double])(in: DataStream[Pixel])
: DataStream[Pixel] = {
require(m.rows % 2 == 1 & m.cols % 2 == 1)
val area = m.cols*m.rows
def expand(p: Pixel): TraversableOnce[Pixel] = ???
class reduce
extends WindowFunction[Pixel, Pixel,String, GlobalWindow] {
override
def apply(key: String,window: GlobalWindow,input: Iterable[Pixel],
out: Collector[Pixel]):Unit = ??? }
in.flatMap( x => expand(x) )
.keyBy(_.keyFrameXY)
.countWindow(area)
.apply(new reduce )
}

We need to implement the expand and reduce function. The expand function creates for the incoming pixel one copy for each pixel in the  neighborhood with the corresponding value. You can write something like:

def expand(p: Pixel): TraversableOnce[Pixel] = {
val sizeX = (m.rows - 1) / 2
val sizeY = (m.cols - 1) / 2
for {
x y } yield Pixel(p.name,p.frame,(p.x - x + pointsX)%pointsX, (p.y - y + pointsY)%pointsY,p.v * m(sizeX + x,sizeY + y))}

The reduce function is very simple, just summing all the values with the same pixel coordinates

class reduce
extends WindowFunction[Pixel, Pixel,String, GlobalWindow] {
override
def apply(key: String,
window: GlobalWindow,
input: Iterable[Pixel],
out: Collector[Pixel]):Unit =
out.collect(
Pixel(input.head.name,
input.head.frame,
input.head.x,input.head.y,
input.map(_.v).sum))}

It is trivial changing matrix convolution by function convolution, you can write:

def convolutionFunction
(f: (Int,Int) => Double,sizeX: Int,sizeY: Int)
(in:DataStream[Pixel]): DataStream[Pixel] = {

val area = (2*sizeX+1)*(2*sizeY + 1)
// Expand function
def expand(p: Pixel): TraversableOnce[Pixel] = {
for {
x <- -sizeX to sizeX
y <- -sizeY to sizeY
} yield Pixel(p.name,p.frame,(p.x - x + pointsX)%pointsX, (p.y - y + pointsY)%pointsY,p.v * f(x,y))
}

class reduce
  extends WindowFunction[Pixel, Pixel,String, GlobalWindow] {

  override def apply(key: String, window: GlobalWindow,
                     input: Iterable[Pixel],
                     out: Collector[Pixel]):Unit = {

 out.collect(
Pixel(input.head.name,
      input.head.frame,
      input.head.x,
      input.head.y,
      input.map(_.v).sum))
  }
}

in.flatMap( x => expand(x) )
  .keyBy(_.keyFrameXY)
  .countWindow(area)
  .apply(new reduce )
}

Is it very easy, isn’t it? No we can create the main function:

def main(args: Array[String]): Unit = {
// Define some kernels
// Gaussian
def Gaussian(s: Double)(x: Int,y: Int): Double =
  1.0 / 2 / math.Pi / s/s * math.exp( - ( x*x + +y*y)/2.0/s/s)
// Laplacian of Gaussian
def LoG(s: Double)(x: Int,y: Int): Double =
  1.0 / math.Pi / math.pow(s,4) *
  (1.0 - (x*x + y*y)/2.0/s/s) * math.exp( - ( x*x + +y*y)/2.0)
// Blur
val blur = DenseMatrix.create(3,3,
 Array(0.0,0.2,0.0,
       0.2,0.2,0.2,
       0.0,0.2,0.0))
// Start streaming enviroment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Read data from Kafka
val rawStream = env
   .addSource(new FlinkKafkaConsumer09[String]("imageIn",
                       ew SimpleStringSchema(),properties))
// From String to Pixels
val pixelStream: DataStream[Pixel] = rawStream.map(Pixel(_) )
// Matrix Convolution
val pixelStreamMatrix = convolutionMatrix(matrixBlur)(pixelStream)
// Function Convolution
val LoG14: (Int,Int) => Double = LoG(1.4)
val pixelStreamFunction = convolutionFunction(LoG14 ,5,5)(pixelStream)

//New data to Kafka
pixelStreamMatrix.addSink(new FlinkKafkaProducer09[String] ("localhost:9092","imageMatrix",newSimpleStringSchema))
pixelStreamFunction,addSink(...)

env.execute("Image Processing using Apache Flink")
}

Author: alfonso.sastre

I am a scientist moving from Theoretical Physics to Data Science. Interested in solving problems, either with a computer (or thousands of them) or on the blackboard (I enjoy blackboard discussions). Always exploring new technologies, now focused on functional programming and how porting numerical algorithms (not just Machine learning) to Big Data technologies. I am also interested in energy optimization at both hardware and software level. Beside science and technology, I enjoy rock climbing and mostly being on the mountains (btw, if you like rock climbing maybe you want visit to my web www.climbeando.com).

One thought on “Real-time Image Processing using Apache Flink”

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s