Pyspark 中 RDD 到 DF 的转换不完整
使用 PySpark 1.6.3,我尝试将 RDD 转换为 Dataframe。这是在 Zeppelin 笔记本中运行的测试代码。感兴趣的 RDD 是
rdd_ret
。
>>> from pyspark.sql import Row
>>> rdd_ret.count()
9301
>>> rddofrows = rdd_ret.map(lambda x: Row(**x))
>>> things = rddofrows.take(10000)
>>> len(things)
9301
>>> [type(x) for x in things if type(x) != Row]
[]
>>> [len(x) for x in things if len(x) != 117]
[]
所以我们在这里看到,我们肯定有 9301 行,它们都是 Row 对象,并且所有行的长度都相同。现在我想转换为 DataFrame:
>>> outdf = rddofrows.toDF(sampleRatio=0.1)
>>> outdf.count()
这会引发错误:
TypeError:'NoneType' 对象不可迭代
;底部有完整的堆栈跟踪。
生成了输出 Dataframe 对象,但我尝试在其上运行的任何操作(.show();.count();.filter())都会在底部产生相同的堆栈跟踪。我不明白在这种情况下 NoneType 可能是什么?当然,Row 对象中的某些值可能有误,但为了计数或显示,您应该遍历 Dataframe 的所有行,这些行都在那里。
这是怎么回事?
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5665146503764823323.py", line 360, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 1, in <module>
File "/usr/hdp/current/spark-client/python/pyspark/sql/dataframe.py", line 269, in count
return int(self._jdf.count())
File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/hdp/current/spark-client/python/pyspark/sql/utils.py", line 45, in deco
return f(*a, **kw)
File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o2282.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 21 in stage 1256.0 failed 4 times, most recent failure: Lost task 21.3 in stage 1256.0 (TID 62913, usg-kov-e1b-slv005.c.pg-us-n-app-053847.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/hdp/current/spark-client/python/pyspark/worker.py", line 111, in main
process()
File "/usr/hdp/current/spark-client/python/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/hdp/current/spark-client/python/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/hdp/current/spark-client/python/pyspark/sql/types.py", line 924, in convert_struct
return tuple(conv(v) for v, conv in zip(obj, converters))
File "/usr/hdp/current/spark-client/python/pyspark/sql/types.py", line 924, in <genexpr>
return tuple(conv(v) for v, conv in zip(obj, converters))
File "/usr/hdp/current/spark-client/python/pyspark/sql/types.py", line 900, in <lambda>
return lambda row: [conv(v) for v in row]
TypeError: 'NoneType' object is not iterable
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
at org.apache.spark.rdd.RDD.collect(RDD.scala:933)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1499)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1506)
at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1516)
at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1515)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100)
at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1515)
at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
根据要求,其中一行如下所示:
Row(accountType='individual', added='2018-06-05T01:52:34.257+0000', assignment='null', author='noahmagel', authorCity='null', authorCityCode='null',
authorContinent='North America', authorContinentCode='n-a', authorCountry='United States', authorCountryCode='us', authorCounty='null',
authorCountyCode='null', authorLocation='n-a,us,,,', authorState='null', authorStateCode='null', avatarUrl='https://pbs.twimg.com/profile_images/613069089263718401/P1BWMsFG_normal.jpg',
averageDurationOfVisit=20.0, averageVisits=6.0, backlinks=49850734.0, blogComments=0.0, checked=False, city='null', cityCode='null', continent='North America',
continentCode='n-a', country='United States', countryCode='us', county='null', countyCode='null', date='2017-12-11T10:58:36.000+0000',
displayUrls=[], domain='twitter.com', engagement=0.0, expandedUrls=[], facebookAuthorId='null', facebookComments=0.0, facebookLikes=0.0,
facebookRole='null', facebookShares=0.0, facebookSubtype='null', forumPosts=0.0, forumViews=0.0, fullText='@oli_braun @elonmusk @SpaceX Take my money 💰',
fullname='noah', gender='male', id=167783541878.0, imageMd5s=None, impact=34.0, importanceAmplification=28.0, importanceReach=40.0,
impressions=208.0, influence=502.0, insightsHashtag=[], insightsMentioned=['@elonmusk', '@spacex', '@oli_braun'], instagramCommentCount=0.0,
instagramFollowerCount=0.0, instagramFollowingCount=0.0, instagramInteractionsCount=0.0, instagramLikeCount=0.0, instagramPostCount=0.0,
interest=['Fine arts', 'Business', 'Technology'], language='en', lastAssignmentDate='null', latitude=0.0, lemmatize=['money'],
locationName='null', logoImages=None, longitude=0.0, matchPositions=[], mediaFilter='null', mediaUrls=[], monthlyVisitors=6000000000.0, mozRank=9.6,
originalUrl='http://twitter.com/noahmagel/statuses/940173969935818752', outreach=0.0, pageType='twitter', pagesPerVisit=22.0, percentFemaleVisitors=46.0,
percentMaleVisitors=54.0, priority='null', professions=[], queryId=1999376256.0, queryName='Braun_English', reach=502.0,
replyTo='http://twitter.com/oli_braun/statuses/940171345115144192', resourceId=167783541878.0, resourceType='page', retweetOf='null',
sentiment='neutral', shortUrls=[], snippet='@oli_braun @elonmusk @SpaceX Take my money 💰', starred=False, state='null', stateCode='null', status='null',
subtype='null', tags=[], textlen=44, threadAuthor='oli_braun', threadCreated='null', threadEntryType='reply', threadId='0', threadURL='null',
title='noah (@noahmagel): @oli_braun @elonmusk @Spac ...', trackedLinkClicks=0.0, trackedLinks='null', twitterAuthorId='2246429194',
twitterFollowers=208.0, twitterFollowing=513.0, twitterPostCount=381.0, twitterReplyCount=0.0, twitterRetweets=0.0, twitterRole='null',
twitterVerified=False, updated='2018-06-05T01:52:34.257+0000', url='http://twitter.com/noahmagel/statuses/940173969935818752', wordCount='null')
我使用以下内容重现了此问题:
sc = spark.sparkContext
json_rows = ['{"key1": [{"foo": 1}, {"bar": 2}]}',
'{"key2": 1}']
rows = sc.parallelize(json_rows)
df = spark.read.json(rows)
rdd = df.rdd
new_df = spark.createDataFrame(rdd, samplingRatio=1)
new_df.head(2)
给出了相同的错误:
File "/usr/hdp/current/spark-client/python/pyspark/sql/types.py", line 900, in <lambda>
return lambda row: [conv(v) for v in row]
TypeError: 'NoneType' object is not iterable
请注意,这些行工作正常:
json_rows = ['{"key1": [1, 2]}',
'{"key2": 1}']
问题出在您有一个
list
或
ArrayType
,其元素为
StructType
或
Row
类型时。
StructType
和
Row
类型需要转换,请参阅
源代码
:
def _need_converter(dataType):
if isinstance(dataType, StructType):
return True
elif isinstance(dataType, ArrayType):
return _need_converter(dataType.elementType)
因此,它现在将尝试转换数组内的元素 (
lambda row: [conv(v) for v in row]
)。如果该键的值对于某一行曾经为 None(如果该键不在该行上或明确为 None),这将引发错误。老实说,这看起来像是 spark 代码中的一个错误?您是否认为它会在尝试对数组中的元素调用
conv()
之前检查数组是否不为 None?
我的解决方案是通过 map 展平嵌套的行/结构,因此值只会变成 str 文字,无需进行转换。
import json
from pyspark.sql.types import Row, ArrayType
def flatten(x):
x_dict = x.asDict()
for k, v in x_dict.items():
if isinstance(v, Row):
x_dict[k] = json.dumps(v.asDict())
return x_dict
sc = spark.sparkContext
rows = sc.parallelize(json_rows)
df = spark.read.json(rows)
flat_rdd = df.rdd.map(lambda x: flatten(x))
flat_df = spark.createDataFrame(flat_rdd, samplingRatio=1)
flat_df.head(2)
请注意,对于我的用例,我的所有嵌套对象都是
Row
类型,并且我可以展平整个
Row
,因为它无论如何都会转到 Redshift。YMMV。对于我上面的列表示例,您可能已经检查了类型
list
,我认为只要其元素是文字,就完全可以保留嵌套列表,因此您可以展平列表的元素而不是列表本身。
Python 映射“**”用于 python dict 对象。我使用 x.as_dict() 将 Row 转换为 dict,然后可以用作 **x.as_dict()