मेरे पास इस तरह एक स्कैला कोड है

 def avgCalc(buffer: Iterable[Array[String]], list: Array[String]) = {
    val currentTimeStamp = list(1).toLong // loads the timestamp column
    var sum = 0.0
    var count = 0
    var check = false
    import scala.util.control.Breaks._
    breakable {
      for (array <- buffer) {
        val toCheckTimeStamp = array(1).toLong // timestamp column
        if (((currentTimeStamp - 10L) <= toCheckTimeStamp) && (currentTimeStamp >= toCheckTimeStamp)) { // to check the timestamp for 10 seconds difference
          sum += array(5).toDouble // RSSI weightage values
          count += 1
        }

        if ((currentTimeStamp - 10L) > toCheckTimeStamp) {

          check = true
          break

        }
      }
    }
     list :+ sum

  }

मैं उपरोक्त फ़ंक्शन को इस तरह कॉल करूंगा

 import spark.implicits._
  val averageDF =
    filterop.rdd.map(_.mkString(",")).map(line => line.split(",").map(_.trim))
      .sortBy(array => array(1), false) // Sort by timestamp
      .groupBy(array => (array(0), array(2))) // group by tag and listner
      .mapValues(buffer => {
        buffer.map(list => {
         avgCalc(buffer, list) // calling the average function 
        })
      })
      .flatMap(x => x._2)
      .map(x => findingavg(x(0).toString, x(1).toString.toLong, x(2).toString, x(3).toString, x(4).toString, x(5).toString.toDouble, x(6).toString.toDouble)) // defining the schema through case class
      .toDF // converting to data frame

उपरोक्त कोड ठीक काम कर रहा है। लेकिन मुझे सूची से छुटकारा पाने की जरूरत है। मेरे वरिष्ठ मुझसे सूची को हटाने के लिए कहते हैं, क्योंकि सूची निष्पादन की गति को कम करती है। सूची के बिना आगे बढ़ने के लिए कोई सुझाव? किसी भी तरह की सहायता को आभार समझेंगे।

3
Jessi joseph 27 जुलाई 2017, 08:06

2 जवाब

सबसे बढ़िया उत्तर

मुझे लगता है कि निम्नलिखित समाधान काम करना चाहिए, मैंने चलने योग्य और एक सरणी दोनों को पारित करने से बचने की कोशिश की है।

def avgCalc(buffer: Iterable[Array[String]]) = {
  var finalArray = Array.empty[Array[String]]
  import scala.util.control.Breaks._
  breakable {
    for (outerArray <- buffer) {
      val currentTimeStamp = outerArray(1).toLong
      var sum = 0.0
      var count = 0
      var check = false
      var list = outerArray
      for (array <- buffer) {
        val toCheckTimeStamp = array(1).toLong
        if (((currentTimeStamp - 10L) <= toCheckTimeStamp) && (currentTimeStamp >= toCheckTimeStamp)) {
          sum += array(5).toDouble
          count += 1
        }
        if ((currentTimeStamp - 10L) > toCheckTimeStamp) {
          check = true
          break
        }
      }
      if (sum != 0.0 && check) list = list :+ (sum / count).toString
      else list = list :+ list(5).toDouble.toString

      finalArray ++= Array(list)
    }
  }
  finalArray
}

और आप इसे पसंद कर सकते हैं

import sqlContext.implicits._
val averageDF =
  filter_op.rdd.map(_.mkString(",")).map(line => line.split(",").map(_.trim))
    .sortBy(array => array(1), false)
    .groupBy(array => (array(0), array(2)))
    .mapValues(buffer => {
        avgCalc(buffer)
    })
    .flatMap(x => x._2)
    .map(x => findingavg(x(0).toString, x(1).toString.toLong, x(2).toString, x(3).toString, x(4).toString, x(5).toString.toDouble, x(6).toString.toDouble))
    .toDF

मुझे आशा है कि यह वांछित उत्तर है

4
Ramesh Maharjan 27 जुलाई 2017, 12:50

मैं देख सकता हूं कि आपने एक उत्तर स्वीकार कर लिया है, लेकिन मेरा कहना है कि आपके पास बहुत सारे अनावश्यक कोड हैं। जहाँ तक मैं देख सकता हूँ, आपके पास Array प्रकार में प्रारंभिक रूपांतरण करने का कोई कारण नहीं है और इस बिंदु पर sortBy भी अनावश्यक है। मेरा सुझाव है कि आप सीधे Row पर काम करें।

इसके अलावा आपके पास कई अप्रयुक्त चर हैं जिन्हें हटाया जाना चाहिए और केवल एक केस-क्लास में कनवर्ट करना toDF के बाद अत्यधिक IMHO लगता है।

मैं ऐसा कुछ करूंगा:

import org.apache.spark.sql.Row

def avgCalc(sortedList: List[Row]) = {
  sortedList.indices.map(i =>  {
    var sum = 0.0
    val row = sortedList(i)
    val currentTimeStamp = row.getString(1).toLong // loads the timestamp column

    import scala.util.control.Breaks._
    breakable {
      for (j <- 0 until sortedList.length) {
        if (j != i) {
          val anotherRow = sortedList(j)
          val toCheckTimeStamp = anotherRow.getString(1).toLong // timestamp column
          if (((currentTimeStamp - 10L) <= toCheckTimeStamp) && (currentTimeStamp >= toCheckTimeStamp)) { // to check the timestamp for 10 seconds difference
            sum += anotherRow.getString(5).toDouble // RSSI weightage values
          }

          if ((currentTimeStamp - 10L) > toCheckTimeStamp) {
            break
          }
        }
      }
    }
    (row.getString(0), row.getString(1), row.getString(2), row.getString(3), row.getString(4), row.getString(5), sum.toString)
  })
}


val averageDF = filterop.rdd
  .groupBy(row => (row(0), row(2)))
  .flatMap{case(_,buffer) => avgCalc(buffer.toList.sortBy(_.getString(1).toLong))}
  .toDF("Tag", "Timestamp", "Listner", "X", "Y", "RSSI", "AvgCalc")

और एक अंतिम टिप्पणी के रूप में, मुझे पूरा यकीन है कि avgCalc फ़ंक्शन के अच्छे/क्लीनर कार्यान्वयन के साथ आना संभव है, लेकिन मैं इसे आपके साथ खेलने के लिए छोड़ दूँगा :)

1
Glennie Helles Sindholt 27 जुलाई 2017, 15:13