In this post I am going to explain how to create a (fast and dirty) real-time image processing streaming using Apache Flink (the same thing can be done easily using Apache Spark).
I am going to focus first on convolutions for both matrices and functions. The idea is as follows:
- Images are read from Apache Kakfa with the following format:
imageName;frame;x;y;value
- Images are processed using Apache Flink
- Processed imaged are streamed again using Apache Kakfa
First of all, let’s create a case class for Pixels
case class Pixel(name: String,frame: Int,x: Int,y: Int,v: Double) { lazy val keyFrameXY = name + "-" + frame + "-" + x + "-" + y} object Pixel { def apply(s: String): Pixel = { val stringList = s.split(';') Pixel(stringList(0),stringList(1).toInt, stringList(2).toInt,stringList(3).toInt, stringList(4).toDouble)}}
In order to implement convolution we are going to use the flatMapReduce method introduced in my previous post. A simple implementation looks like this:
def convolutionMatrix(m: DenseMatrix[Double])(in: DataStream[Pixel]) : DataStream[Pixel] = { require(m.rows % 2 == 1 & m.cols % 2 == 1) val area = m.cols*m.rows def expand(p: Pixel): TraversableOnce[Pixel] = ??? class reduce extends WindowFunction[Pixel, Pixel,String, GlobalWindow] { override def apply(key: String,window: GlobalWindow,input: Iterable[Pixel], out: Collector[Pixel]):Unit = ??? } in.flatMap( x => expand(x) ) .keyBy(_.keyFrameXY) .countWindow(area) .apply(new reduce ) }
We need to implement the expand and reduce function. The expand function creates for the incoming pixel one copy for each pixel in the neighborhood with the corresponding value. You can write something like:
def expand(p: Pixel): TraversableOnce[Pixel] = { val sizeX = (m.rows - 1) / 2 val sizeY = (m.cols - 1) / 2 for { x y } yield Pixel(p.name,p.frame,(p.x - x + pointsX)%pointsX, (p.y - y + pointsY)%pointsY,p.v * m(sizeX + x,sizeY + y))}
The reduce function is very simple, just summing all the values with the same pixel coordinates
class reduce extends WindowFunction[Pixel, Pixel,String, GlobalWindow] { override def apply(key: String, window: GlobalWindow, input: Iterable[Pixel], out: Collector[Pixel]):Unit = out.collect( Pixel(input.head.name, input.head.frame, input.head.x,input.head.y, input.map(_.v).sum))}
It is trivial changing matrix convolution by function convolution, you can write:
def convolutionFunction (f: (Int,Int) => Double,sizeX: Int,sizeY: Int) (in:DataStream[Pixel]): DataStream[Pixel] = { val area = (2*sizeX+1)*(2*sizeY + 1) // Expand function def expand(p: Pixel): TraversableOnce[Pixel] = { for { x <- -sizeX to sizeX y <- -sizeY to sizeY } yield Pixel(p.name,p.frame,(p.x - x + pointsX)%pointsX, (p.y - y + pointsY)%pointsY,p.v * f(x,y)) } class reduce extends WindowFunction[Pixel, Pixel,String, GlobalWindow] { override def apply(key: String, window: GlobalWindow, input: Iterable[Pixel], out: Collector[Pixel]):Unit = { out.collect( Pixel(input.head.name, input.head.frame, input.head.x, input.head.y, input.map(_.v).sum)) } } in.flatMap( x => expand(x) ) .keyBy(_.keyFrameXY) .countWindow(area) .apply(new reduce ) }
Is it very easy, isn’t it? No we can create the main function:
def main(args: Array[String]): Unit = { // Define some kernels // Gaussian def Gaussian(s: Double)(x: Int,y: Int): Double = 1.0 / 2 / math.Pi / s/s * math.exp( - ( x*x + +y*y)/2.0/s/s) // Laplacian of Gaussian def LoG(s: Double)(x: Int,y: Int): Double = 1.0 / math.Pi / math.pow(s,4) * (1.0 - (x*x + y*y)/2.0/s/s) * math.exp( - ( x*x + +y*y)/2.0) // Blur val blur = DenseMatrix.create(3,3, Array(0.0,0.2,0.0, 0.2,0.2,0.2, 0.0,0.2,0.0)) // Start streaming enviroment val env = StreamExecutionEnvironment.getExecutionEnvironment // Read data from Kafka val rawStream = env .addSource(new FlinkKafkaConsumer09[String]("imageIn", ew SimpleStringSchema(),properties)) // From String to Pixels val pixelStream: DataStream[Pixel] = rawStream.map(Pixel(_) ) // Matrix Convolution val pixelStreamMatrix = convolutionMatrix(matrixBlur)(pixelStream) // Function Convolution val LoG14: (Int,Int) => Double = LoG(1.4) val pixelStreamFunction = convolutionFunction(LoG14 ,5,5)(pixelStream) //New data to Kafka pixelStreamMatrix.addSink(new FlinkKafkaProducer09[String] ("localhost:9092","imageMatrix",newSimpleStringSchema)) pixelStreamFunction,addSink(...) env.execute("Image Processing using Apache Flink") }