Stack Overflow » Scala
7 FOLLOWERS
Get discussions on Apache Spark, Gradle, Apache Spark SQL, and more. Share knowledge on how to set up jar files in itellij for scala, help people convert a map, and write scripts here.
Stack Overflow » Scala
3h ago
I would like to add comments to columns of an existing Delta table, without having to actually write SQL statements like "ALTER TABLE ALTER COLUMN". Is it possible to do it using only Scala ..read more
Stack Overflow » Scala
12h ago
I am working on building/researching a datawarehousing solution for my firm. Requirements are:
We can get close to 2mm records per day (100 columns).
Users should be able to query/do analytics on at least 1 yr worth of data as quickly as possible. (2mm * 365 records).
Most of the legacy code is done in Scala so any solution that has good support for Scala is also a plus...
Database/Datawarehouse solutions (AWS based):
RedShift
RDS
Aurora
Hosting MySQL on an EC2 instance
Datawarehouse solutions (Non-AWS based):
Snowflake
BigQuery
Any suggestions? Thank you for your help ..read more
Stack Overflow » Scala
15h ago
We have a requirement where we need to truncate BQ table from Scala Spark.
The idea behind is that every table column has description attached to it. If we overwrite table, the description no longer persist.
We have explored various option like -
.option("writeDisposition","WRITE_TRUNCATE") -- Unfortunately it didn't worked :(
.mode("overwrite") -- This didn't preserve the description
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.{BigQueryOptions, TableId}
val bq = BigQueryOptions.getDefaultInstance().getService()
val table = bq.getTable(TableId.of("project_id ..read more
Stack Overflow » Scala
18h ago
I have a dataframe that is a list of Delta tables in the hive_metastore. For each table, I want to fetch the Delta Log to extract some information. I can do this by collecting the DataFrame in an Array and processing each row separately (sequentially or in parallel), but I am trying to test if using UDFs could speed up all or some parts.
The UDF:
import org.apache.spark.sql.functions.udf
val udfGetDeltaLog = udf(
(catalog: String, database: String, table: String) => {
val deltaLog = try {
Some(DeltaLog.forTable(spark, TableIdentifier(table, Some(database), Some(catalog ..read more
Stack Overflow » Scala
18h ago
Given input dataframe with structure:
| machine_id | process_id | activity_type | timestamp |
| ---------- | ---------- | ------------- | --------- |
| 0 | 0 | start | 0.712 |
| 0 | 0 | end | 1.52 |
| 0 | 1 | start | 3.14 |
| 0 | 1 | end | 4.12 |
| 1 | 0 | start | 0.55 |
| 1 | 0 | end | 1.55 |
The task is to calculate average time of process per machine.
The solution is to calculate diffe ..read more
Stack Overflow » Scala
21h ago
Currently, I am trying to run locally a gps to cartesian converter app using scala lang. During writing the output of parquet files. I got the folowing error;
[TASK_WRITE_FAILED] Task failed while writing rows to file:/opt/spark-apps/2251.
at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:774)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:420)
at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
at org.apache.spark.rdd.RDD ..read more
Stack Overflow » Scala
21h ago
I am trying to convert json from one structure to another. In the process I would like to add a prefix to one of the string fields (if it exists). I need to use the play framework.
How would I add-to/process json fields as they are parsed using macros?
case class Book(title: Option[String], author: Option[String], published: Option[Long], info: JsObject)
object Book {
implicit val bookImplicitReads = Json.reads[Book]
implicit val bookImplicitWrites = Json.writes[Book]
}
def bookConvertor(inputObject: JsObject) = {
Try {
val title = (inputObject \ "title").asOpt[String]
val auth ..read more
Stack Overflow » Scala
1d ago
Following code imports the same symbol twice, because it imports two different objects which both export it:
object Ext:
def backwards(s: String): String = s.reverse
object A:
export Ext.*
object B:
export Ext.*
import A.*
import B.*
backwards("Hello")
The error is:
Reference to backwards is ambiguous. It is both imported by import A._ and imported subsequently by import B._
It is the same symbol eventually, therefore there is in fact no ambiguity, but I guess some implementations details of export hide this from the compiler.
How can I solve this?
Motivation: In my project I ..read more
Stack Overflow » Scala
2d ago
The JSON library 'Circe' has a Configuration param, transformConstructorNames: String => String, which allows you to transform the names of any class.
Is there an equivalent for Tapir?
The reason is that Tapir's Schema.derived[OpenAPISealedTrait] works great, but I want to drop the "OpenAPI" part from every class in the hierarchy.
For example, if I have
sealed trait OpenAPIShape
case class OpenAPISquare(size: Int) extends OpenAPIShape
case class OpenAPICircle(radius: Int) extends OpenAPIShape
I want to be able to write Schema.derived[OpenAPIShape] with some config that drops ..read more
Stack Overflow » Scala
2d ago
I am in a bind here. I am trying to implement a very basic pipeline which reads data from kafka and process it in Spark. The problem I am facing is that apache spark shuts down abruptly giving the aforesaid error message. My pyspark version is 3.5.1 and scala version is 2.12.18.
The code in question is :-
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName('my_app') \
.config("spark.jars", "/usr/local/spark/jars/spark-sql-kafka-0-10_2.12-3.5.1.jar") \
.getOrCreate()
df = spark.readStream \
.format('kafka ..read more