- 浏览: 2569127 次
- 性别:
- 来自: 成都
-
文章分类
最新评论
-
nation:
你好,在部署Mesos+Spark的运行环境时,出现一个现象, ...
Spark(4)Deal with Mesos -
sillycat:
AMAZON Relatedhttps://www.godad ...
AMAZON API Gateway(2)Client Side SSL with NGINX -
sillycat:
sudo usermod -aG docker ec2-use ...
Docker and VirtualBox(1)Set up Shared Disk for Virtual Box -
sillycat:
Every Half an Hour30 * * * * /u ...
Build Home NAS(3)Data Redundancy -
sillycat:
3 List the Cron Job I Have>c ...
Build Home NAS(3)Data Redundancy
Spark 2017 BigData Update(5)Spark Streaming in Java
The Streaming Small example Class, WordCountStreamingApp.java
package com.sillycat.sparkjava.app;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import com.sillycat.sparkjava.base.SparkBaseApp;
public class WordCountStreamingApp extends SparkBaseApp {
private static final long serialVersionUID = 7401544141510431796L;
protected String getAppName() {
return "WordCountStreamingApp";
}
public void executeTask(List<String> params) {
SparkConf conf = this.getSparkConf();
// The time interval at which streaming data will be divided into
// batches
logger.info("Start to have the streaming");
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(30000));
ssc.checkpoint(this.getAppName());
logger.info("Prepare the resource for streaming");
processStream(ssc, "carl");
logger.info("Streaming is working");
try {
ssc.start();
ssc.awaitTermination();
} catch (InterruptedException e) {
logger.error("InterruptedException:", e);
}
}
private void processStream(JavaStreamingContext ssc, String keyword) {
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "fr-stage-api:9092,fr-stage-consumer:9092,fr-perf1:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "WordCountStreamingApp");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);
Collection<String> topics = Arrays.asList("sillycat-topic");
logger.info("Init the Kafka Clients to fetch lines");
JavaInputDStream<ConsumerRecord<String, String>> dStream = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
dStream.foreachRDD(rdd -> {
processRows(rdd, keyword);
});
}
private void processRows(JavaRDD<ConsumerRecord<String, String>> rdds, String keyword) {
JavaRDD<String> rows = rdds.map(record -> record.value());
JavaRDD<String> lines = rows.filter(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
public Boolean call(String s) throws Exception {
if (s == null || s.trim().length() < 1) {
return false;
}
if (!s.contains(keyword)) {
return false;
}
return true;
}
});
long count = lines.count();
logger.info("Kafka received " + count + " " + keyword);
}
}
Here is how we should run all these testing:
#Run the local#
>java -jar target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.CountLinesOfKeywordApp
>java -jar target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.WordCountStreamingApp
#Run binary on local#
>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp /Users/carl/work/sillycat/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.CountLinesOfKeywordApp
>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp /home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.WordCountStreamingApp
#Run binary on Remote YARN Cluster#
>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp --master yarn-client /home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.CountLinesOfKeywordApp
>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp --master yarn-client /home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.WordCountStreamingApp
References:
http://sillycat.iteye.com/blog/2407639
The Streaming Small example Class, WordCountStreamingApp.java
package com.sillycat.sparkjava.app;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import com.sillycat.sparkjava.base.SparkBaseApp;
public class WordCountStreamingApp extends SparkBaseApp {
private static final long serialVersionUID = 7401544141510431796L;
protected String getAppName() {
return "WordCountStreamingApp";
}
public void executeTask(List<String> params) {
SparkConf conf = this.getSparkConf();
// The time interval at which streaming data will be divided into
// batches
logger.info("Start to have the streaming");
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(30000));
ssc.checkpoint(this.getAppName());
logger.info("Prepare the resource for streaming");
processStream(ssc, "carl");
logger.info("Streaming is working");
try {
ssc.start();
ssc.awaitTermination();
} catch (InterruptedException e) {
logger.error("InterruptedException:", e);
}
}
private void processStream(JavaStreamingContext ssc, String keyword) {
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "fr-stage-api:9092,fr-stage-consumer:9092,fr-perf1:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "WordCountStreamingApp");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);
Collection<String> topics = Arrays.asList("sillycat-topic");
logger.info("Init the Kafka Clients to fetch lines");
JavaInputDStream<ConsumerRecord<String, String>> dStream = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
dStream.foreachRDD(rdd -> {
processRows(rdd, keyword);
});
}
private void processRows(JavaRDD<ConsumerRecord<String, String>> rdds, String keyword) {
JavaRDD<String> rows = rdds.map(record -> record.value());
JavaRDD<String> lines = rows.filter(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
public Boolean call(String s) throws Exception {
if (s == null || s.trim().length() < 1) {
return false;
}
if (!s.contains(keyword)) {
return false;
}
return true;
}
});
long count = lines.count();
logger.info("Kafka received " + count + " " + keyword);
}
}
Here is how we should run all these testing:
#Run the local#
>java -jar target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.CountLinesOfKeywordApp
>java -jar target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.WordCountStreamingApp
#Run binary on local#
>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp /Users/carl/work/sillycat/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.CountLinesOfKeywordApp
>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp /home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.WordCountStreamingApp
#Run binary on Remote YARN Cluster#
>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp --master yarn-client /home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.CountLinesOfKeywordApp
>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp --master yarn-client /home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.WordCountStreamingApp
References:
http://sillycat.iteye.com/blog/2407639
发表评论
-
Update Site will come soon
2021-06-02 04:10 1698I am still keep notes my tech n ... -
Stop Update Here
2020-04-28 09:00 333I will stop update here, and mo ... -
NodeJS12 and Zlib
2020-04-01 07:44 493NodeJS12 and Zlib It works as ... -
Docker Swarm 2020(2)Docker Swarm and Portainer
2020-03-31 23:18 380Docker Swarm 2020(2)Docker Swar ... -
Docker Swarm 2020(1)Simply Install and Use Swarm
2020-03-31 07:58 383Docker Swarm 2020(1)Simply Inst ... -
Traefik 2020(1)Introduction and Installation
2020-03-29 13:52 354Traefik 2020(1)Introduction and ... -
Portainer 2020(4)Deploy Nginx and Others
2020-03-20 12:06 440Portainer 2020(4)Deploy Nginx a ... -
Private Registry 2020(1)No auth in registry Nginx AUTH for UI
2020-03-18 00:56 452Private Registry 2020(1)No auth ... -
Docker Compose 2020(1)Installation and Basic
2020-03-15 08:10 394Docker Compose 2020(1)Installat ... -
VPN Server 2020(2)Docker on CentOS in Ubuntu
2020-03-02 08:04 478VPN Server 2020(2)Docker on Cen ... -
Buffer in NodeJS 12 and NodeJS 8
2020-02-25 06:43 403Buffer in NodeJS 12 and NodeJS ... -
NodeJS ENV Similar to JENV and PyENV
2020-02-25 05:14 498NodeJS ENV Similar to JENV and ... -
Prometheus HA 2020(3)AlertManager Cluster
2020-02-24 01:47 440Prometheus HA 2020(3)AlertManag ... -
Serverless with NodeJS and TencentCloud 2020(5)CRON and Settings
2020-02-24 01:46 346Serverless with NodeJS and Tenc ... -
GraphQL 2019(3)Connect to MySQL
2020-02-24 01:48 262GraphQL 2019(3)Connect to MySQL ... -
GraphQL 2019(2)GraphQL and Deploy to Tencent Cloud
2020-02-24 01:48 463GraphQL 2019(2)GraphQL and Depl ... -
GraphQL 2019(1)Apollo Basic
2020-02-19 01:36 336GraphQL 2019(1)Apollo Basic Cl ... -
Serverless with NodeJS and TencentCloud 2020(4)Multiple Handlers and Running wit
2020-02-19 01:19 322Serverless with NodeJS and Tenc ... -
Serverless with NodeJS and TencentCloud 2020(3)Build Tree and Traverse Tree
2020-02-19 01:19 331Serverless with NodeJS and Tenc ... -
Serverless with NodeJS and TencentCloud 2020(2)Trigger SCF in SCF
2020-02-19 01:18 306Serverless with NodeJS and Tenc ...
相关推荐
1. **Oracle Big Data Connectors**:允许Oracle数据库与Hadoop、Hive、HBase等大数据平台之间进行数据交换和集成。 2. **Oracle NoSQL数据库**:非关系型数据库系统,适合处理海量结构化和半结构化数据。 3. **...
deepseek经验分享-陈雄.pptx
本家政服务平台就是在这样的大环境下诞生,其可以帮助管理者在短时间内处理完毕庞大的数据信息,使用这种软件工具可以帮助管理人员提高事务处理效率,达到事半功倍的效果。此家政服务平台利用当下成熟完善的Spring Boot框架,使用跨平台的可开发大型商业网站的Java语言,以及最受欢迎的RDBMS应用软件之一的MySQL数据库进行程序开发。家政服务平台有管理员,雇主,雇员三个角色。管理员功能有个人中心,雇主管理,雇员管理,资料认证管理,项目类型管理,服务项目管理,需求信息管理,服务预约管理,申请预约管理,签订合同管理,雇主评价管理,留言板管理,系统管理。雇主可以发布需求,雇员可以申请预约,雇主支付报酬,雇主和雇员可以签订合同,雇主可以对雇员进行评价。家政服务平台的开发根据操作人员需要设计的界面简洁美观,在功能模块布局上跟同类型网站保持一致,程序在实现基本要求功能时,也为数据信息面临的安全问题提供了一些实用的解决方案。可以说该程序在帮助管理者高效率地处理工作事务的同时,也实现了数据信息的整体化,规范化与自动化。 关键词:家政服务平台;Spring Boot框架;MySQL;自动化
SAP SD-Class 4 Item proposal & CMIR-Customer material info record.mp4
【毕业设计】“跑鸭”微信小程序-一款基于校园跑步的社交小程序(实时里程配速、运动路径、整公里提醒、周榜月榜、打卡分享、热门推荐、线上活动、勋章墙、隐私设置 【源码+论文+答辩ppt+开题报告+任务书】.zip【项目资源】:包含前端、后端、移动开发、操作系统、人工智能、物联网、信息化管理、数据库、硬件开发、大数据、课程资源、音视频、网站开发等各种技术项目的源码。包括STM32、ESP8266、PHP、QT、Linux、iOS、C++、Java、MATLAB、python、web、C#、EDA、proteus、RTOS等项目的源码。 【项目质量】:所有源码都经过严格测试,可以直接运行。功能在确认正常工作后才上传。 【适用人群】:适用于希望学习不同技术领域的小白或进阶学习者。可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 【附加价值】:项目具有较高的学习借鉴价值,也可直接拿来修改复刻。对于有一定基础或热衷于研究的人来说,可以在这些基础代码上进行修改和扩展,实现其他功能。 【沟通交流】:有任何使用上的问题,欢迎随时与博主沟通,博主会及时解答。鼓励下载和使用,并欢迎大家互相学习,共同进步。
大学生创业项目源码
使用requests和BeautifulSoup库爬取豆瓣电影Top250的基本信息
数据集文件包含美国各地电动汽车 (EV) 注册的详细记录,其结构便于分析并与数据处理工具集成。它通常以 CSV 格式提供,以确保与各种数据分析平台的兼容性。 关键列和数据属性: 做:电动汽车制造商(例如,特斯拉、日产、雪佛兰)。 型:车辆的特定型号(例如,Model S、Leaf、Bolt)。 车型年份:车辆模型的制造年份。 电动续航里程:每次充电的估计电动行驶里程(以英里为单位)。 EV 类型:将车辆分类为电池电动汽车 (BEV) 或插电式混合动力电动汽车 (PHEV)。 州和县:车辆注册的地理位置,允许进行区域分布分析。 注册人数:每个地区每个车型注册的车辆数量,有助于识别高浓度的 EV 区域。
施耐德ATV312变频器通过MCGS RTU通讯实现双机监控与控制的触摸屏集成解决方案,无PLC的施耐德ATV312变频器通讯示例:触摸屏控制监控两台变频器,功能多且省成本,改进型可调整步长 P&O MPPT(二区MPPT复现),光储系统MPPT 直流负载供电的单级离网光伏系统中,降压转器将太阳能光伏阵列和直流负载连接起来,同时确保最大功率点跟踪(MPPT) 和电池充电控制的良好运行。 在MPPT方面,提出了一种改进的自适应步长扰动观测(P&O)方法,以达到不同天气条件下太阳能光伏阵列的实际最大功率点(MPP),同时减少稳态振荡和功率损耗。 此外,电池充电控制侧使用三级充电控制器 (TSCC) 为铅酸电池站充电。 ,改进型P&O; 复现二区MPPT; 光储系统MPPT; 最大功率点跟踪(MPPT); 步长扰动观测; 降压转换器; 太阳能光伏阵列; 电池充电控制; 三级充电控制器(TSCC); 铅酸电池站。,改进型P&O MPPT技术,光储系统高效能量管理
基于stm32家庭安全防控系统 (程序+WiFi)
Maxwell电机与Simplorer联合仿真教程:矢量控制SVPWM算法与电路搭建详解,自定义电机模型替换指南,Maxwell电机与Simplorer联合仿真教程:电路搭建及矢量控制SVPWM算法实
CNN相关以及垃圾分类数据集
互联网发展至今,无论是其理论还是技术都已经成熟,而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播,搭配信息管理工具可以很好地为人们提供服务。针对信息管理混乱,出错率高,信息安全性差,劳动强度大,费时费力等问题,采用流浪动物救助网站可以有效管理,使信息管理能够更加科学和规范。 流浪动物救助网站在Eclipse环境中,使用Java语言进行编码,使用Mysql创建数据表保存本系统产生的数据。系统可以提供信息显示和相应服务,其管理员增删改查动物信息和动物信息资料,审核动物信息预订订单,查看订单评价和评分,通过留言功能回复用户提问。 总之,流浪动物救助网站集中管理信息,有着保密性强,效率高,存储空间大,成本低等诸多优点。它可以降低信息管理成本,实现信息管理计算机化。 关键词:流浪动物救助网站;Java语言;Mysql
对机动车号牌信息管理的提升,也为了对机动车号牌信息进行更好的维护,机动车号牌管理系统的出现就变得水到渠成不可缺少。通过对机动车号牌管理系统的开发,不仅仅可以学以致用,让学到的知识变成成果出现,也强化了知识记忆,扩大了知识储备,是提升自我的一种很好的方法。通过具体的开发,对整个软件开发的过程熟练掌握,不论是前期的设计,还是后续的编码测试,都有了很深刻的认知。 机动车号牌管理系统通过MySQL数据库与Spring Boot框架进行开发,机动车号牌管理系统能够实现牌照换补申请管理,用户管理,牌照申请管理,牌照转移申请管理,车辆信息管理,公告信息管理等功能。 通过机动车号牌管理系统对相关信息的处理,让信息处理变的更加的系统,更加的规范,这是一个必然的结果。已经处理好的信息,不管是用来查找,还是分析,在效率上都会成倍的提高,让计算机变得更加符合生产需要,变成人们不可缺少的一种信息处理工具,实现了绿色办公,节省社会资源,为环境保护也做了力所能及的贡献。 关键字:机动车号牌管理系统,牌照,车辆信息
Maxwell电机与Simplorer联合仿真教程:矢量控制SVPWM算法及电路搭建指南,包含详细视频与可复制电机模型替换示范,教程Simplorer与Maxwell电机联合仿真,详细教程包含电路
内容概要:本文档详细介绍了一个名为《Python实现基于IBES-ELM基于改进的秃鹰搜索优化算法优化极限学习机的数据回归预测》的项目。该项目旨在通过结合改进的秃鹰搜索优化算法(IBES-EO)和极限学习机(ELM),优化ELM模型以提高其预测精度,尤其针对多指标、高维数据以及噪声数据的处理进行了探讨。项目涵盖了从数据预处理到建模预测的一系列完整流程,并提供了代码案例和GUI界面设计思路。文档详细阐述了模型的工作机制、适用场景及其实现细节。 适合人群:对机器学习有兴趣,特别是对ELM、IBES-EO感兴趣的研究人员、开发人员和技术爱好者。 使用场景及目标:适用于各种回归预测问题,包括但不限于金融预测、气象预测、健康数据分析和智能交通系统等。目标在于提供一种高效的解决方案,提高在大规模复杂数据集中进行回归预测的能力,同时也展示了如何将生物启发式的优化算法运用于改进现有的机器学习模型,为实际应用提供更多可能。 阅读建议:文档按照章节顺序编排,从背景介绍到具体实现再到最终总结。初学者可以从头至尾通读,以掌握全流程概念和技能;有一定经验的读者可以直接跳转至自己感兴趣的环节,例如优化算法的具体设计或者代码实现部分。建议边学习边动手实验,以达到最佳的学习效果,并可通过提供的完整示例代码加深理解和记忆。此外,项目中有关于系统架构设计、API接口搭建等内容也可作为实际工程项目参考。
本案例使用鸢尾花萼长度(sepal length)、花萼宽度(sepal width)、花瓣长度(petal length)和花瓣宽度(petal width)作为特征,采用Adaboost算法将其进行分类。 同学们通过本案例学习Adaboost算法的理论基础以及在分类问题中的应用,同时通过实际操作加深对分类算法和数据分析的理解。更好的掌握机器学习算法,培养对数据科学的兴趣和实践能力。
【项目资源】:包含前端、后端、移动开发、操作系统、人工智能、物联网、信息化管理、数据库、硬件开发、大数据、课程资源、音视频、网站开发等各种技术项目的源码。包括STM32、ESP8266、PHP、QT、Linux、iOS、C++、Java、MATLAB、python、web、C#、EDA、proteus、RTOS等项目的源码。 【项目质量】:所有源码都经过严格测试,可以直接运行。功能在确认正常工作后才上传。 【适用人群】:适用于希望学习不同技术领域的小白或进阶学习者。可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 【附加价值】:项目具有较高的学习借鉴价值,也可直接拿来修改复刻。对于有一定基础或热衷于研究的人来说,可以在这些基础代码上进行修改和扩展,实现其他功能。 【沟通交流】:有任何使用上的问题,欢迎随时与博主沟通,博主会及时解答。鼓励下载和使用,并欢迎大家互相学习,共同进步。
【项目资源】:包含前端、后端、移动开发、操作系统、人工智能、物联网、信息化管理、数据库、硬件开发、大数据、课程资源、音视频、网站开发等各种技术项目的源码。包括STM32、ESP8266、PHP、QT、Linux、iOS、C++、Java、MATLAB、python、web、C#、EDA、proteus、RTOS等项目的源码。 【项目质量】:所有源码都经过严格测试,可以直接运行。功能在确认正常工作后才上传。 【适用人群】:适用于希望学习不同技术领域的小白或进阶学习者。可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 【附加价值】:项目具有较高的学习借鉴价值,也可直接拿来修改复刻。对于有一定基础或热衷于研究的人来说,可以在这些基础代码上进行修改和扩展,实现其他功能。 【沟通交流】:有任何使用上的问题,欢迎随时与博主沟通,博主会及时解答。鼓励下载和使用,并欢迎大家互相学习,共同进步。
Wordpress-6.7.2.zip