Want to work and learn live streaming data processing? Easiest way to create a twitter developer app and follow below code to ingest and store data in your AWS S3 for further analysis and processing with tools like Amazon EMR or Machine learning projects.
Install Trust app and click arrow button to switch to business related services, click “Job Mock Interviewer” –>then “AWS job” to connect me to learn deeper concepts of Spark/Scala/Java/AWS without any cost. You may also outsource tech screening interview process to me. Trust Business app is available only in India. Others please visit “About me” of this blog to get my linkedin contact.
/** * @author Gyanendra * @Date : 08/12/19 */ import org.apache.spark.SparkConf import org.apache.spark.streaming.twitter.TwitterUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder object TweeterStreamReaderApp { def main(args: Array[String]) { var twitterCredentials = new Array[String](4); //consumerKey twitterCredentials(0) = "gA7xFE3S1QfVTN55Uuzb"; //consumerSecret twitterCredentials(1) = "2te2Z1yFvynXcp06rc2j3zg38tNAa1zY29rOT3d5BFI"; //accessToken twitterCredentials(2) = "1063309360480-61DChczOivazJZTWodLfuRRW8gDNfJ"; //accessTokenSecret twitterCredentials(3) = "bFYPmpiWhFgOtdJGe95YyhOntxOQAmx0xEYtF"; val appName = "TweeterStreamReader" val conf = new SparkConf() conf.setAppName(appName).setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(5)) val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = twitterCredentials.take(4) val filters = args.takeRight(args.length - 4) val cb = new ConfigurationBuilder cb.setDebugEnabled(true).setOAuthConsumerKey(consumerKey) .setOAuthConsumerSecret(consumerSecret) .setOAuthAccessToken(accessToken) .setOAuthAccessTokenSecret(accessTokenSecret) val auth = new OAuthAuthorization(cb.build) val tweets = TwitterUtils.createStream(ssc, Some(auth), filters) val englishTweets = tweets.filter(_.getLang() == "en") englishTweets.repartition(1) // lets print all rdd. Further you can store this to S3 englishTweets.foreachRDD { (rdd, time) => p(rdd) } def p(rdd: org.apache.spark.rdd.RDD[_]) = rdd.foreach(println) ssc.start() ssc.awaitTermination() } }
Download this code from my repo