博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
sparkStreaming SQL黑名单过滤
阅读量:7115 次
发布时间:2019-06-28

本文共 2413 字,大约阅读时间需要 8 分钟。

hot3.png

1.目的

    在线过滤掉黑名单的点击,防止刷点击刷评分刷票数等行为

2.素材

    1)mysql建立blacklist表

mysql> select * from blacklist;+--------+--------+| name   | status |+--------+--------+| hadoop | true   || spark  | true   |+--------+--------+

    2) socket输入模拟点击log

        启动linux上的netcat程序

            nc -lk 9999

        输入字符

            20170901141258  tom

            20170901141301  hadoop
            20170901141306  jesse

3.代码

/**  * Created by puwenchao on 2017-09-06.  */package Streamingimport org.apache.log4j.{Level, Logger}import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.sql.SQLContextcase class AD(time:String,name: String)object streaming_blacklist_sql {  def main(args:Array[(String)]): Unit ={    //设定日志等级    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)    //创建上下文,设置batch时间间隔5s    val conf = new SparkConf().setAppName("streaming_blacklist_sql").setMaster("local[4]")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)    val ssc = new StreamingContext(sc, Seconds(5))    import sqlContext.implicits._    //从数据库中加载blacklist表    val blacklistDF  = sqlContext.read.format("jdbc").options(Map(      "url"-> "jdbc:mysql://192.168.252.141:3306/test",      "driver"->"com.mysql.jdbc.Driver",      "dbtable"->"blacklist",      "user"->"root",      "password"-> "mysql"    )).load()    blacklistDF .registerTempTable("blicklist")    //从socket中接收广告点击数据,并转为case class中定义的那种格式    val adsClick = ssc.socketTextStream("192.168.252.141", 9999, StorageLevel.MEMORY_ONLY)    val adsClickPar = adsClick.map(_.split(" ")).map(ads => AD(ads(0), ads(1)))    // 隐式转换为DF并注册为临时表,再用SQL语句查询并打印出来    adsClickPar.foreachRDD ( ads => {      ads.toDF().registerTempTable("adclick")      val sql_str = "select a.time,a.name " +                      "from adclick a left join blicklist b " +                        "on a.name=b.name " +                     "where b.status is null"      val normalClick = sqlContext.sql(sql_str)      normalClick.show()      }    )    //计算后的有效数据一般都会写入Kafka中,下游的计费系统会从kafka中pull到有效数据进行计费    ssc.start()    ssc.awaitTermination()  }}

4.输出

+---------------------+------+

|                    time|name|
+---------------------+------+
|20170901141258|  tom|
+---------------------+------+
|20170901141306|jesse|
+---------------------+------+

转载于:https://my.oschina.net/puwenchao/blog/1546202

你可能感兴趣的文章
细说浏览器特性检测(1)-jQuery1.4添加部分
查看>>
C errno是否是线程安全的
查看>>
类的初始化
查看>>
百度AI开放平台 UNIT平台开发在线客服 借助百度的人工智能如何开发一个在线客服系统...
查看>>
python大战机器学习——半监督学习
查看>>
ethereum/EIPs-1271 smart contract
查看>>
ADempiere3.6.0LTS - 重新导入会计科目(基于Ubuntu Desktop 12.04 LTS)
查看>>
Project Euler Problem 48: Self powers
查看>>
python一个小程序:猜数字
查看>>
转:web.xml 配置中classpath: 与classpath*:的区别
查看>>
vue-自定义组件传
查看>>
由用户反映DroidPilot安装之后,License没有同步安装 - 解决办法
查看>>
java发展史与java的语言特性
查看>>
trunk端口配置错误导致环路
查看>>
《坟·题记》
查看>>
小女也爱c#(1)
查看>>
洛谷P4070 生成魔咒
查看>>
linux下挂载NTFS分区错误修复
查看>>
是C太傻逼?还是C++不够傻逼;
查看>>
Anaconda使用总结
查看>>