Skip to content

GitLab

  • Projects
  • Groups
  • Snippets
  • Help
    • Loading...
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Sign in / Register
K
kb
  • Project overview
    • Project overview
    • Details
    • Activity
    • Releases
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 2
    • Issues 2
    • List
    • Boards
    • Labels
    • Service Desk
    • Milestones
  • Merge requests 0
    • Merge requests 0
  • Operations
    • Operations
    • Incidents
  • Analytics
    • Analytics
    • Repository
    • Value Stream
  • Wiki
    • Wiki
  • Members
    • Members
  • Activity
  • Graph
  • Create a new issue
  • Commits
  • Issue Boards
Collapse sidebar
  • granite
  • kb
  • Wiki
    • Knowledge_share
  • sparksql简介

Last edited by lizj Mar 10, 2022
Page history
This is an old version of this page. You can view the most recent version or browse the history.

sparksql简介

spark之简单数据处理

什么是spark

Spark是一个通用的分布式数据处理引擎。
通用:通用指的是Spark可以做很多事情。包括机器学习、数据流传输、交互分析、ETL、批处理、图计算等等都是Spark可以做到的。
甚至可以说,你需要用数据实现的任何事情,你都可以用Spark试试看。
分布式:指的是Spark处理数据的能力是建立在许多机器上的,是可以和分布式的存储系统对接的,是可以做横向扩展的(简单点说就是电脑越多,能力越大)
引擎:所谓引擎,说的就是Spark自己不会存储数据,它就像实体的机械引擎一样,会将燃料(对Spark来说是数据)转化成使用者需要的那种形式——例如驱动汽车,
再例如得到一个需要的目标结论。但无论如何,巧妇难为无米之炊,没数据是万万不行的。

什么是sparksql

SparkSQL是Spark⽤来处理结构化数据的⼀个模块,它提供了⼀个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作⽤。
与基本的SparkRDDAPI不同,SparkSQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,SparkSQL使用此额外信息来执行额外的优化。
有几种方法可以与SparkSQL交互,包括SQL和DatasetAPI。计算结果时,将使用相同的执行引擎,而与用于表示计算的API/语言无关。
这种统一意味着开发人员可以轻松地在不同的API之间来回切换,基于这些API提供了最自然的方式来表达给定的转换。

Hive是将Hive SQL转换成MapReduce然后提交到集群上执⾏,⼤⼤简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执⾏效率⽐较慢。
所有SparkSQL的应运⽽⽣,它是将SparkSQL转换成RDD,然后提交到集群执⾏,执⾏效率⾮常快!

Hive的应⽤其实就是应对不会写Java的开发⼈员但是有会写SQL的数据库语⾔提供的是MR的⼀种简化。
SparkSQL其实也就是对SparkCore中RDD的⼀种简化,⽤SQL的语⾔可以对RDD编程进⾏开发。

ps:Spark是有处理上限的(10PB),超过这个范围还是要使⽤hive来进⾏处理,hive是在100PB级别

为什么要用sparksql

1. 易整合:将SQL查询与Spark程序无缝混合。
SparkSQL允许使用SQL或DataFrame的API在Spark程序中查询结构化数据。支持Java,Scala,Python和R语言。
val results: DataFrame = spark.sql(
  "SELECT * FROM people")
val names = results.map(lambda p: p.name)

2. 统一的数据访问方式:以相同的方式连接到任何数据源。
DataFrame和SQL提供了一种访问各种数据源的常用方法,包括 Hive、Avro、Parquet、ORC、JSON和JDBC。甚至可以跨这些源联接数据。
spark.read.json("s3n://...")
  .registerTempTable("json")
val results: DataFrame = spark.sql(
  """SELECT *
     FROM people
     JOIN json ...""")

3. 兼容hive:在现有仓库上运行SQL或HiveQL查询。
SparkSQL支持HiveQL语法以及HiveSerDes和UDF,允许访问现有的Hive仓库。建立session时打开hive支持即可,enableHiveSupport()

4. 标准的数据连接:通过JDBC或ODBC连接。
服务器模式为商业智能工具提供行业标准的JDBC和ODBC连接。

我们可以用来干什么

1. 操作hive,由mysql同步过来的大表、存量工商表等
2. 操作hudi,线上mongo同步过来的ic等
3. 操作mysql,大表不建议直接操作
4. 操作es
5. 操作nebula
6. 操作各种格式的文件等
各种类型数据源操作后转存到其他存储工具

原理是什么

SparkSQL对SQL语句的处理和关系型数据库类似,即词法/语法解析、绑定、优化、执⾏。SparkSQL会先将SQL语句解析成⼀棵树,
然后使⽤规则(Rule)对Tree进⾏绑定、优化等处理过程。Spark SQL由Core、Catalyst、Hive、Hive-ThriftServer四部分构成:
Core: 负责处理数据的输⼊和输出,如获取数据,查询结果输出成DataFrame等
Catalyst: 负责处理整个查询过程,包括解析、绑定、优化等
Hive: 负责对Hive数据进⾏处理
Hive-ThriftServer: 主要⽤于对hive的访问

1646734530342

LogicalPlan-->逻辑计划
Analyzer-->分析器
Optimizer-->优化器

1646734570612

怎么使用sparksql

可以用python、scala等进行开发,这里演示scala

spark-shell

1646896815162

直接启动spark-shell,根据需要可以不带参数直接启动也可以指定相关配置启动,启动后进入交互界面,会默认创建
一个名为spark的sparkSession和一个名为sc的sparkContext,并且打开了.enableHiveSupport()。
可以使用spark和sc通过编码直接进行操作。

spark-shell \
--master yarn-client \
--executor-memory 16g  \
--num-executors 30 \
--executor-cores 6 \
--total-executor-cores 180 \
--jars /data/227/data4/nebula_data/jars/mysql-connector-java-8.0.27.jar \
--driver-class-path /data/227/data4/nebula_data/jars/mysql-connector-java-8.0.27.jar

示例:
1. 操作hive:
  val dfHive: DataFrame = spark.sql("select * from ic_base.company_industry limit 100").persist(StorageLevel.MEMORY_AND_DISK_SER)
  dfHive.show(10, 32)
  dfHive.count
  dfHive.printSchema
  dfHive.createOrReplaceTempView("tmp")
  spark.sql("insert into table select * from tmp")
2. 操作hudi:
  val dfHudi: DataFrame = spark
            .read.format("org.apache.hudi")
            .load("hdfs://emr-header-1.cluster-210983:9000/user/collie/warehouse/ods/ic.db/company_branch/*")
#  			.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
#			.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitTime)
#			.option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/year=2020/month=*/day=*")
  dfHudi.write.format("org.apache.hudi").mode(SaveMode.Append).save("path")
3. 操作mysql:
  val dfMysql1: DataFrame = spark
            .read
            .jdbc(url,table,column,lowerBound,upperBound,numPartitions,new Properties())
  val dfMysql2: DataFrame = spark
            .read
            .format("jdbc")
            .options(Map("url" -> properties.getProperty("jdbc.url"),
                "dbtable" ->
                    f"(SELECT commit_time FROM nebula_commit_time where type = 'dwm' and table_name = '$table' order by update_time desc) nebula_commit_time"))
            .load()
  dfMysql1
      .write
      .mode(SaveMode.Append)
      .jdbc("jdbc:mysql://10.8.6.87:3801/utn_ec?user=root&password=root123","tb_ebusiness_data_b",new Properties())
4. 操作各种文件
  val df: DataFrame = spark
        .read
        .format("csv")
        .option("inferschema", "false")
        .option("header", "false")
        .option("quote", "\'")
        .option("encoding", "utf8")
        .load(path)
        .toDF("_id", "branch_company_name_digest", "company_name_digest", "update_time", "n_company_status")
  val df: DataFrame = spark
        .read
        .json
        .load("path")    
  df.write
    .mode(SaveMode.Append)
    .option("quoteAll", value = true)
    .csv(csvPath.concat("base_tmp"))
  df.write
    .mode(SaveMode.Append)
    .json("path")
    .parquet("path")

spark-submit

编写程序后打成jar包(包含依赖和不包含依赖),通过spark-submit将任务提交到集群上执行,此处结束maven方式和工件方式

spark-submit \
--deploy-mode cluster \
--jars jars/nebula-spark-connector-3.0.0.jar \
--driver-class-path jars/nebula-spark-connector-3.0.0.jar \
--class com.pa.v3.Update azkaban/equity-penetration.jar \
azkaban/updateDwmNebula.properties

链接

Spark SQL 和 DataFrames - Spark 2.4.7 文档 (apache.org)

GitHub - apache/spark: Apache Spark - A unified analytics engine for large-scale data processing

Clone repository
  • README
  • basic_guidelines
  • basic_guidelines
    • basic_guidelines
    • dev_guide
    • project_build
    • 开发流程
  • best_practice
  • best_practice
    • AlterTable
    • RDS
    • azkaban
    • create_table
    • design
    • elasticsearch
    • elasticsearch
      • ES运维
    • logstash
View All Pages