Spark中从Struct<xxx>字符串如何快速创建Schema

在使用Spark 2.4.0的一个新方法schema_of_json,主要是用来从json格式字符串中推断Schema的,方法有两个重载,源码如下

/**
 * Parses a JSON string and infers its schema in DDL format.
 *
 * @param json a JSON string.
 *
 * @group collection_funcs
 * @since 2.4.0
 */
def schema_of_json(json: String): Column = schema_of_json(lit(json))

/**
 * Parses a JSON string and infers its schema in DDL format.
 *
 * @param json a string literal containing a JSON string.
 *
 * @group collection_funcs
 * @since 2.4.0
 */
def schema_of_json(json: Column): Column = withExpr(new SchemaOfJson(json.expr))

我不知道怎么使用schema_of_json(json: Column)这个方法,我通过如下测试报错:

scala> df.select(schema_of_json($"col"))
org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(`col`)' due to data type mismatch: The input json should be a string literal and not null; however, got `col`.;;

看错误信息是需要传入一个字符串参数,所以如图测试了

clipboard.png

它的Row是一个String对象,我现在能想到的就是用take取出一行,然后通过字符串分隔,取出各个Column的列,然后重构StructType。像这样

scala> val str = df.select(schema_of_json(df.take(1)(0).get(0).toString).alias("schema")).select(regexp_extract($"schema","struct<(.*?)>",1)).take(1)(0).getAs[String](0)
str: String = cardno:string,cardtype:string,flag:string,times:string,userid:string

scala> val columns = str.split(",").map(x=>x.split(":")).map(x=>x(0))
columns: Array[String] = Array(cardno, cardtype, flag, times, userid)

scala> var schema = (new StructType)
schema: org.apache.spark.sql.types.StructType = StructType()

scala> columns.map(x=>{schema = schema.add(x,StringType,true)})
res154: Array[Unit] = Array((), (), (), (), ())

scala> schema
res159: org.apache.spark.sql.types.StructType = StructType(StructField(cardno,StringType,true), StructField(cardtype,StringType,true), StructField(flag,StringType,true), 
StructField(times,StringType,true), StructField(userid,StringType,true))

但是StructType有一个simpleString方法,返回的就是上图中的值

scala> schema.simpleString
res160: String = struct<cardno:string,cardtype:string,flag:string,times:string,userid:string>

虽然我上面写的可行,但要是遇到那种嵌套的复杂的Schema,这样写就很复杂。想问下有什么快速把struct<xxx>字符串转化成StructType的方法吗?

或者有什么其他方法解析json格式的字符串吗?我这里使用的Structured Streaming,创建的是df,而需求是不能提前确定Schema长什么样,需要的是从消息字符串中去推断。

阅读 3.1k
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进