Follow Databricks on Feedspot

Continue with Google
Continue with Facebook


Databricks by Yandong Mao, Yu Peng, Andrew Chen, .. - 4M ago

Secure your production workloads end-to-end with Databricks’ comprehensive access control system

Databricks offers role-based access control for clusters and workspace to secure infrastructure and user code. Today, we are excited to announce role-based access control for Databricks Jobs as well so that users can easily control who can access the job output and control the execution of their production workloads.

A Databricks Job consists of a built-in scheduler, the task that you want to run, logs, output of the runs, alerting and monitoring policies. Databricks Jobs allows users to easily schedule Notebooks, Jars from S3, Python files from S3 and also offers support for spark-submit. Users can also trigger their jobs from external systems like Airflow or Jenkins.

Sensitivities with scheduled production jobs

When running production workloads, users want to control who can access part of the scheduled jobs and actions. For example:

  • Securing job output: A phenomenal advantage of Databricks Jobs is that it allows users to easily schedule notebooks and view the results of different runs as shown below. These outputs can contain personally identifiable information (PII) data or other sensitive information. Users want only certain colleagues to look at this information but without giving them any other controls on the job.
  • Securing logs: Logs can contain sensitive information and users don’t want unprivileged users to view the logs.
  • Controlling job execution: As an owner of a job, you want to restrict access to team members so only they can cancel any bad runs or trigger some manual runs for troubleshooting.
  • Controlling access to job properties: Databricks Jobs offers many great functionalities like custom alerting, monitoring, timeouts for job runs, etc. Users don’t want others changing the properties of their jobs.
  • Job ownership: Every Databricks Job has an owner on behalf of whom all the scheduled runs are executed. When an owner leaves an organization, there needs to be an easy way to switch the ownership, so that jobs are not left as orphans.

Securing your production workloads in Databricks

We are excited to introduce fine-grained access control for Databricks Jobs to safeguard different aspects of your production workloads from unprivileged users in the organization.

Enabling Jobs Access Control

Admins can enable access control for jobs along with the clusters in the admin console.

Permission Levels

Once Jobs ACLs are enabled, each user or group can have one of five different permission levels on a Databricks Job. Admin or job owner can give other users or groups one or more of these permissions. The permission levels form a lineage where a user with higher permission level can do anything the lower levels allow. The below table captures the permission levels in order and what they entail.

Permissions What do the permissions allow?
Default Allows a user only to look at the job settings and run metadata. The user cannot view any other sensitive information like job output or logs.
Can View Allows the user only to look at the job output and logs along with job settings. The user cannot control the job execution.
Manage Runs Allows the user to view the output and also cancel and trigger individual runs.
Owner Allows the user to edit job settings.
Admin Allows the user to change job owner.

These permissions can be given from the ‘Advanced’ section in the job details page as shown below:

Job Ownership

Only a job owner can change the job settings. If a job owner leaves the organization or wishes to transfer the ownership, the admin can easily switch the owner of that job. There can be only one owner for the job.

User Role and Actions

The below table summarizes the different user roles and what actions they are allowed.

User Role→


Default User with view permission User with manage run permission Job owner Admins
View job settings
View current and historical job output
View current and historical logs
Trigger a new run
Cancel current runs
Modify cluster & job settings
Modify job permissions
Delete the job
Change owner to someone else
End-to-end comprehensive access control system

Users can control access to infrastructure, code and data by leveraging all of the access control mechanisms provided by Databricks:

  • Cluster ACLs: All jobs require clusters and users can separately give fine-grained access control to users and groups on what permissions they have on the underlying infrastructure on which the production jobs run.
  • Workspace ACLs: Users can schedule notebooks from the Databricks workspace. Using the workspace ACLs, users can control who can read/run/modify their production code.
  • Data ACLs: Access control to data is offered through Amazon’s IAM roles. Databricks also supports access control on IAM roles so that admins can control which users are entitled to use what IAM roles in Databricks.
  • Jobs ACLs: As illustrated above, access control on jobs themselves empower users to secure their production jobs from unprivileged users.
What’s next?

Start running your Spark jobs on Databricks by signing up for a free trial of Databricks.

If you have any questions, you can contact us with your questions.


Try Databricks for free. Get started today.

The post Access Control for Databricks Jobs appeared first on Databricks.

Read Full Article
Visit website
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 
Databricks by Yu Peng, Andrew Chen And Prakash Ch.. - 4M ago

Continuous integration and continuous delivery (CI/CD) is a practice that enables an organization to rapidly iterate on software changes while maintaining stability, performance and security. Continuous Integration (CI) practice allows multiple developers to merge code changes to a central repository. Each merge typically triggers an automated build that compiles the code and runs unit tests. Continuous delivery (CD) expands on CI by pushing code changes to multiple environments like QA and staging after build has been completed so that new changes can be tested for stability, performance and security. CD typically requires manual approval before the new changes are pushed to production. Continuous deployment automates the production push as well.

Figure 1: A CI/CD pipeline.

Many organizations have adopted various tools to follow the best practices around CI/CD to improve developer productivity, code quality, and deliver software faster. As we are seeing massive adoption of Databricks amongst our customer base for data engineering and machine learning, one common question that comes up very often is how to follow the CI/CD best practices for data pipelines built on Databricks.

In this blog, we outline the common integration points for a data pipeline in the CI/CD cycle and how you can leverage functionalities in Databricks to integrate with your systems. Our own internal data pipelines follow the approach outlined in this blog to continuously deliver audit logs to our customers.

Key challenges for CI/CD in building a data pipeline

Following are the key phases and challenges in following the best practices of CI/CD for a data pipeline:

Figure 2: A high level workflow for CI/CD of a data pipeline with Databricks.

  • Data exploration: Databricks’ interactive workspace provides a great opportunity for exploring the data and building ETL pipelines. When multiple users need to work on the same project, there are many ways a project can be set up and developed in this collaborative environment. Often users find it hard to get the right approach with notebooks.
  • Iterative development with unit tests: As you are building ETL prototypes by exploring data in notebooks and moving towards maturity, code can get quickly unwieldy and writing unit tests can become a problem.
  • Continuous integration and build: As new code is getting merged, the build server must be able to pull the latest changes and run the unit tests for various components and publish the latest artifacts.
  • Pushing data pipeline to staging environment: Once all the unit tests have passed, the build server must be able to push the data pipeline to a staging environment to test the pipeline on a much larger data set that resembles production data for performance and data quality.
  • Pushing data pipeline to production environment: The final phase is pushing the data pipeline in staging into production so that the next run of the pipeline picks the latest code and generates the new data set in production.

In the rest of the blog, we will walk through each of these phases and how you can leverage Databricks for building your data pipeline.

Development in Databricks’ Interactive Workspace

Lets pick a scenario where three data engineers are planning to work on Project X. Let’s say Project X is planned to enhance an existing ETL pipeline that has source code in notebooks and libraries.

The development environment in Databricks typically consists of:

The recommended approach is to set up a development environment per user. Each user pushes the notebook to their own personal folders in the interactive workspace. They work on their own copies of the source code in the interactive workspace. They then export it via API/CLI to their local development environment and then check-in to their own branch before making a pull request against the master branch. They also have their own small clusters for the development.

Figure 3: A recommended setup for multiple data engineers developing on a same project.

Setting up a dev environment in Databricks

To setup the dev environment, users can do the following:

  • Create a branch and checkout the code to their computer.
  • Copy the notebooks from local directory to Databricks’ workspace using the workspace command line interface (CLI)
databricks workspace import_dir etl/common /Users/alice@databricks.com/etl/common
databricks workspace import_dir etl/config /Users/alice@databricks.com/etl/config
  • Copy the libraries from local directory to DBFS using the DBFS CLI
databricks fs cp etl-2.1-assembly.jar dbfs:/alice/etl/etl-2.1-assembly.jar
  • Create a cluster using the API or UI.
  • Attach the libraries in DBFS to a cluster using the libraries API
Iterative development

It is easy to modify and test the change in the Databricks workspace and iteratively test your code on a sample data set. After the iterative development, when you want to check in your changes, you can do the following:

  • Download the notebooks
databricks workspace export_dir /Users/alice@databricks.com/etl/common etl/common
  • Create a commit and make a pull request in version control for code review.
Productionize and write unit test

As you are moving from prototype to a mature stage in the development phase, modularizing code and unit testing them is essential. Following are some of the best practices you can consider during your development:

  • Download some of the notebooks that has core logic into your computer and refactor them as Java / Scala classes or Python packages in your favorite IDE with dependency injection principles.
  • Write unit tests for those classes.
  • Keep the lightweight business logic that might change frequently in notebooks.
  • Package the core logic as libraries and upload back to Databricks and keep iterating in notebooks by calling the core logic.

Figure 4: An example code in notebook calling core logic in libraries.

Why not package all code into a library?

Some developers prefer putting all code into a library and directly run with Databricks Jobs in staging and production. While you can do that, there are some significant advantages of using the hybrid approach of having core logic in libraries and using notebooks as a wrapper that stitches everything together:

  • Parameterization: You can very quickly change the input parameters in a notebook and run your core logic in libraries. Just for a configuration change, you don’t need to compile your jar and re-upload and run them again.

Figure 5: Easily parameterize and run your data pipeline.

  • Simple chaining: You can chain a simple linear workflow with fail fast mechanisms. The workflow could be chaining different code blocks from the same library or chaining different libraries too.
  • Easy performance analysis: You can easily look at the performance of different stages by having the code broken down into different cells and looking at the time taken for each cell to execute as shown in Figure 6.
  • Visual troubleshooting of intermediate stages: You can easily look at the intermediate results instead of searching through a lot of logs for your debug statements.

Figure 6: Leveraging notebooks to easily look at output statements and performance of intermediate stages.

  • Return results from the job: You can programmatically get the exit status of the notebook and take corresponding action in your workflow externally.

Continuous integration and build

Once the code is properly refactored as libraries and notebooks, the build server can easily run all the unit tests on them to make sure the code is of high quality. The build server will then push the artifacts (libraries and notebooks) to a central location (Maven or cloud storage like S3).

Pushing data pipeline to staging environment

Pushing a data pipeline to a staging environment in Databricks involves the following things:

  • Libraries: The build server can programmatically push the libraries to a staging folder in DBFS in Databricks using the DBFS API.
  • Notebooks: The build server can also programmatically push the notebooks to a staging folder in the Databricks workspace through the Workspace API.
  • Jobs and cluster configuration: The build server can then leverage the Jobs API to create a staging job with a certain set of configuration, provide the libraries in DBFS and point to the main notebook to be triggered by the job.
  • Results: The build server can also get the output of the run and then take further actions based on that.
Blue/green deployment to production environment

A production environment would be very akin to the staging environment. Here we recommend doing the blue/green deployment for easy rollback in case of any issues. You can also do an in-place rollout.

Here are the steps required to do a blue/green deployment:

  • Push the new production ready libraries to a new DBFS location.
  • Push the new production ready notebooks to a new folder under a restricted production folder in Databricks’ workspace.
  • Modify the job configuration to point to the new notebook and library location so that the next run of the job can pick them up and run the pipeline with the new code.
set -x


# We need to write the pipe the conf into databricks configure --token since
# that command only takes inputs from stdin. 
conf=`cat << EOM

# For password auth there are three lines expected
# hostname, username, password
echo "$conf" | databricks configure --token

# Paths to place the new production libraries and notebooks

# Copy the library from local filesystem to dbfs.
databricks fs cp target/common-utils-2.1.jar “$PRODUCTION_LIB_PATH”/common-utils-2.1.jar

# Recursively import the appropriate notebooks to the production folder.
databricks workspace import_dir etl “$PRODUCTION_WORKSPACE_PATH”

JSON=`cat << EOM
  "new_cluster": {
    "spark_version": "3.1.x-scala2.11",
    "node_type_id": "i3.xlarge",
    "num_workers": 5,
    "aws_attributes": {
      "availability": "ON_DEMAND",
      "enable_elastic_disk": “true”
  "notebook_task": {
    "notebook_path": "$PRODUCTION_WORKSPACE_PATH/main"
  "libraries": [
      "jar": "$PRODUCTION_LIB_PATH/common-utils-2.1.jar"
  "timeout_seconds": 86400

# Submit the job using the /api/2.0/jobs/runs/submit endpoint.
# If you choose to schedule the jobs using the Databricks Job Scheduler,
# you can use the /api/2.0/jobs/reset endpoint.

RUN_ID=`curl -X POST -d "$JSON" -H "Authorization: Bearer $DATABRICKS_TOKEN" "$DATABRICKS_HOST/api/2.0/jobs/runs/submit" | jq .run_id`

curl -H "Authorization: Bearer $DATABRICKS_TOKEN" "$DATABRICKS_HOST/api/2.0/jobs/runs/get?run_id=$RUN_ID"

Figure 8: Code snippet that demonstrates the blue/green deployment.

Figure 9: A production folder that has different sub-folders for each production push. The production folder has access only to very few people.

Conclusion and Next Steps

We walked through the different stages of CI/CD for a data pipeline and the key challenges.  There are a myriad of ways best practices of CI/CD can be followed. We outlined a recommended approach with Databricks that we internally follow.

Different users have adopted different variants of the above approach. For example, you can look at how Metacog continuously integrate and deliver Apache Spark pipelines here.

If you are interested in adopting Databricks as your big data compute layer, sign up for a free trial of Databricks and try it for yourself. If you need a demo of the above capabilities, you can contact us.


Try Databricks for free. Get started today.

The post Continuous Integration & Continuous Delivery with Databricks appeared first on Databricks.

Read Full Article
Visit website
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

This is a guest community post from Li Jin, a software engineer at Two Sigma Investments, LP in New York.

Try this notebook in Databricks

This blog post introduces the Vectorized UDFs feature in the upcoming Apache Spark 2.3 release that substantially improves the performance and usability of user-defined functions (UDFs) in Python.

Over the past few years, Python has become the default language for data scientists. Packages such as pandas, numpy, statsmodel, and scikit-learn have gained great adoption and become the mainstream toolkits. At the same time, Apache Spark has become the de facto standard in processing big data. To enable data scientists to leverage the value of big data, Spark added a Python API in version 0.7, with support for user-defined functions. These user-defined functions operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead. As a result, many data pipelines define UDFs in Java and Scala, and then invoke them from Python.

Vectorized UDFs built on top of Apache Arrow bring you the best of both worlds—the ability to define low-overhead, high performance UDFs entirely in Python.

In Spark 2.3, there will be two kinds of vectorized UDFs: scalar and grouped. Next, we illustrate their usage using four example programs: Plus One, Cumulative Probability, Subtract Mean, Ordinary Least Squares Linear Regression.

Scalar Vectorized UDFs

Scalar vectorized UDFs are used for vectorizing scalar operations. To define a scalar vectorized UDF, simply use @pandas_udf to annotate a Python function that takes in pandas.Series as arguments and returns another pandas.Series of the same size. Below we illustrate using two examples: Plus One and Cumulative Probability.

Plus One

Computing v + 1 is a simple example for demonstrating differences between row-at-a-time UDFs and vectorized UDFs. Note that built-in column operators can perform much faster in this scenario.

Using row-at-a-time UDFs:

from pyspark.sql.functions import udf # Use udf to define a row-at-a-time udf @udf('double') # Input/output are both a single double value def plus_one(v): return v + 1 df.withColumn('v2', plus_one(df.v))

Using vectorized UDFs:

from pyspark.sql.functions import pandas_udf # Use pandas_udf to define a vectorized udf @pandas_udf('double') # Input/output are both a pandas.Series of doubles def vectorized_plus_one(v): return v + 1 df.withColumn('v2', vectorized_plus_one(df.v))

The examples above define a row-at-a-time UDF “plus_one” and a vectorized UDF “vectorized_plus_one” that performs the same “plus one” computation. The UDF definitions are the same except the function decorators: “udf” vs “pandas_udf”.

In the row-at-a-time version, the user-defined function takes a double “v” and returns the result of “v + 1” as a double. In the vectorized version, the user-defined function takes a pandas.Series “v” and returns the result of “v + 1” as a pandas.Series. Because “v + 1” is vectorized on pandas.Series, the vectorized version is much faster than the row-at-a-time version.
Note that there are two important requirement when using scalar vectorized UDFs:

  • The input and output series must have the same size.
  • How a column is splitted into multiple pandas.Series is internal to Spark, and therefore the result of user-defined function must be independent of the splitting.
Cumulative Probability

This example shows a more practical use of the vectorized udf: computing the cumulative probability of a value in a normal distribution N(0,1) using scipy package.

import pandas as pd from scipy import stats @pandas_udf('double') def cdf(v): return pd.Series(stats.norm.cdf(v)) df.withColumn('cumulative_probability', cdf(df.v))

stats.norm.cdfworks both on a scalar value and pandas.Series, and this example can be written with the row-at-a-time UDFs as well. Similar to the previous example, the vectorized version runs much faster, as shown later in the “Performance Comparison” section.

Grouped Vectorized UDFs

Python users are fairly familiar with the split-apply-combine pattern in data analysis. The grouped vectorized UDFs are designed for this scenario, and they operate on all the data for some group, e.g., “for each date, apply this operation”.

Grouped vectorized UDFs first splits a Spark DataFrame into groups based on the conditions specified in the groupby operator, applies a vectorized user-defined function (pandas.DataFrame -> pandas.DataFrame) to each group, combines and returns the results as a new Spark DataFrame.

Grouped vectorized UDFs uses the same function decorator pandas_udf as scalar vectorized UDFs, but they have a few differences:

  • Input of the user-defined function:
    • Scalar: pandas.Series
    • Grouped: pandas.DataFrame
  • Output of the user-defined function:
    • Scalar: pandas.Series
    • Grouped: pandas.DataFrame
  • Grouping semantics:
    • Scalar: no grouping semantics
    • Grouped: defined by “groupby” clause
  • Output size:
    • Scalar: same as input size
    • Grouped: any size
  • Return types in the function decorator:
    • Scalar: a DataType that specifies the type of the returned pandas.Series
    • Grouped: a StructType that specifies each column name and type of the returned pandas.DataFrame

Next, let us walk through two examples to illustrate the use cases of grouped vectorized UDFs.

Subtract Mean

This example shows a simple use of grouped vectorized udf: subtracting mean from each value in the group.

@pandas_udf(df.schema) # Input/output are both a pandas.DataFrame def subtract_mean(pdf): return pdf.assign(v=pdf.v - pdf.v.mean()) df.groupby('id').apply(subtract_mean)

In this example, we subtract mean of v from each value of v for each group. The grouping semantics is defined by the “groupby” function, i.e, each input pandas.DataFrame to the user-defined function has the same “id” value. The input and output schema of this user-defined function are the same, so we pass “df.schema” to the decorator pandas_udf for specifying the schema.

Grouped vectorized UDFs can also be called as standalone Python functions on the driver. This is very useful for debugging, for example:

sample = df.filter(id == 1).toPandas()
# Run as a standalone function on a pandas.DataFrame and verify result

# Now run with Spark

In the example above, we first convert a small subset of Spark DataFrame to a pandas.DataFrame, and then run subtract_mean as a standalone Python function on it. After verifying the function logics, we can call grouped vectorized UDFs as Spark functions over the entire dataset.

Ordinary Least Squares Linear Regression

The last example shows how to run OLS linear regression for each group using statsmodels. For each group, we calculate beta b = (b1, b2) for X = (x1, x2) according to statistical model Y = bX + c.

import statsmodels.api as sm # df has four columns: id, y, x1, x2 group_column = 'id' y_column = 'y' x_columns = ['x1', 'x2'] schema = df.select(group_column, *x_columns).schema @pandas_udf(schema) # Input/output are both a pandas.DataFrame def ols(pdf): group_key = pdf[group_column].iloc[0] y = pdf[y_column] X = pdf[x_columns] X = sm.add_constant(X) model = sm.OLS(y, X).fit() return pd.DataFrame([[group_key] + [model.params[i] for i in x_columns]], columns=[group_column] + x_columns) beta = df.groupby(group_column).apply(ols)

This example demonstrates that grouped vectorized UDFs can be used with any arbitrary python function: pandas.DataFrame -> pandas.DataFrame. The returned pandas.DataFrame can have different number rows and columns as the input.

Performance Comparison

Lastly, we want to show performance comparison between row-at-a-time UDFs and the new vectorized UDFs. We ran micro benchmarks for three of the above examples (plus one, cumulative probability and subtract mean).

Configuration and Methodology

We ran the benchmark on a single node Spark cluster on Databricks community edition.

Configuration details:
Data: A 10M-row DataFrame with a Int column and a Double column
Cluster: 6.0 GB Memory, 0.88 Cores, 1 DBU
Databricks runtime version: Latest RC (3.4, Scala 2.11)

For the detailed implementation of the benchmark, check the vectorized UDF Notebook.

As shown in the charts, vectorized UDFs perform much better than row-at-a-time UDFs across the board, ranging from 3x to over 100x.

Conclusion and Future Work

The upcoming Spark 2.3 release lays down the foundation for substantially improving the capabilities and performance of user-defined functions in Python. In the future, we plan to introduce support for vectorized UDFs in aggregations and window functions. The related work can be tracked in SPARK-22216.

Vectorized UDFs is a great example of the Spark community effort. We would like to thank Bryan Cutler, Hyukjin Kwon, Jeff Reback, Liang-Chi Hsieh, Leif Walsh, Li Jin, Reynold Xin, Takuya Ueshin, Wenchen Fan, Wes McKinney, Xiao Li and many others for their contributions. Finally, special thanks to Apache Arrow community for making this work possible.

What’s Next

You can try the vectorized notebook and this feature is now available as part of Databricks Runtime 3.4


Try Databricks for free. Get started today.

The post Introducing Vectorized UDFs for PySpark appeared first on Databricks.

Read Full Article
Visit website
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Combining the best of data warehouses, data lakes and streaming

For an in-depth look and demo,
join the webinar.

Today we are proud to introduce Databricks Delta, a unified data management system to simplify large-scale data management. Currently, organizations build their big data architectures using a mix of systems, including data warehouses, data lakes and streaming systems. This greatly increases cost and, more consequentially, operational complexity as systems become hard to connect and maintain.

Databricks Delta is a single data management tool that combines the scale of a data lake, the reliability and performance of a data warehouse, and the low latency of streaming in a single system for the first time. Together with the rest of the Databricks Unified Analytics Platform, Delta makes it dramatically easier to build, manage, and put big data applications into production.

The Problem with Current Data Architectures

Before we dive into Delta, let’s discuss what makes current big data architectures hard to build, manage, and maintain. Most modern data architectures use a mix of at least three different types of systems: streaming systems, data lakes, and data warehouses.

Business data arrives through streaming systems such as Amazon Kinesis or Apache Kafka, which largely focus on rapid delivery. Data is then stored long-term in data lakes, such as Apache Hadoop or Amazon S3, which are optimized for large-scale, ultra low-cost storage. Unfortunately, data lakes alone do not have the performance and features needed to support high-end business applications: thus, the most valuable data is uploaded to data warehouses, which are optimized for high performance, concurrency and reliability at a much higher storage cost than data lakes.

This conventional architecture creates several challenges that all enterprises struggle with. First, the Extract-Transform-Load (ETL) process between these storage systems is error-prone and complex. Data teams spend a large fraction of their time building ETL jobs. If these jobs miss some input data one day or upload data containing errors, all downstream applications suffer. Second, the ETL process adds considerable latency, meaning it can take hours from when a record arrived to when it appears in a data warehouse.

Greg Rokita, executive director of technology at Edmunds.com, describes the problem well: “At Edmunds, obtaining real-time customer and revenue insights is critical to our business. But we’ve always been challenged with complex ETL processing that slows down our access to data.”

At Databricks, we’ve seen these problems throughout organizations of all sizes since we started the company. Based on this experience, we’ve been looking for ways to radically simplify data management. In short, what if we could provide the main benefits of each type of system—data lakes, data warehouses and streaming—in one unified platform, without expensive and error-prone ETL? This is exactly what we built in Delta.

Databricks Delta: Unified Data Management

Delta is a new type of unified data management system that combines the best of data warehouses, data lakes, and streaming. Delta runs over Amazon S3 and stores data in open formats like Apache Parquet. However, Delta augments S3 with several extensions, allowing it to meet three goals:

  1. The reliability and performance of a data warehouse: Delta supports transactional insertions, deletions, upserts, and queries; this enables reliable concurrent access from hundreds of applications. In addition, Delta automatically indexes, compacts and caches data; this achieves up to 100x improved performance over Apache Spark running over Parquet or Apache Hive on S3.
  2. The speed of streaming systems: Delta transactionally incorporates new data in seconds and makes this data immediately available for high-performance queries using either streaming or batch.
  3. The scale and cost-efficiency of a data lake: Delta stores data in cloud blob stores like S3. From these systems it inherits low cost, massive scalability, support for concurrent accesses, and high read and write throughput.

With Delta, organizations no longer need to make a tradeoff between storage system properties, or spend their resources moving data across systems. Hundreds of applications can now reliably upload, query and update data at massive scale and low cost.

From a technical standpoint, Delta achieves these goals by implementing two fundamental extensions over S3:

  • ACID transactions and
  • automatic data indexing (integrated with Delta transactions).

These extensions let Delta perform a wide variety of optimizations, while still providing reliable data access for applications, on behalf of the user. Delta plugs into any Spark job as a data source, stores data in each user’s individual S3 account, and integrates with Databricks Enterprise Security to provide a complete data management platform.

Stay tuned for a more detailed technical discussion of Delta in future blog posts.

A Sample Use Case: Real-Time InfoSec

As Ali Ghodsi, CEO Databricks, mentioned in his keynote at Spark Summit Europe, Delta is already in use with some of our largest customers. Let’s walk through the use case of a Databricks Fortune 100 customer already processing trillions of records per day in production with Delta. Here are their requirements:

  • Large ingest volume at low latency: Delta tables need to be able to ingest trillions of records per day with second to minute latency.
  • Data correctness and transactional updates: Data must be correct and consistent. Partial and failed writes should never show up in end user queries.
  • Fast, flexible queries on current and historical data: Analysts need to analyze petabytes of data with general purpose languages like Python; in addition to SQL.

It took a team of twenty engineers over six months to build their legacy architecture that consisted of various data lakes, data warehouses, and ETL tools to try to meet these requirements. Even then, the team was only able to store two weeks of data in its data warehouses due to cost, limiting its ability to look backward in time. Furthermore, the data warehouses chosen were not able to run machine learning.

Using Delta, this company was able to put their Delta-based architecture into production in just two weeks with a team of five engineers.

Their new architecture is simple and performant. End-to-end latency is low (seconds to minutes) and the team saw up to 100x query speed improvements over open source Apache Spark on Parquet. Moreover, using Delta, the team is now able to run interactive queries on all its historical data — not just two weeks worth — while gaining the ability to leverage Apache Spark for machine learning and advanced analytics.

Getting Started with Delta

Delta is currently in the technical preview phase with multiple Databricks customers. This means that it is currently running in production, but we’re still ironing out some of the details on particularly challenging use cases with passionate customers. Delta won’t be generally available until early next year but if you’re interested in participating in the technical preview, please sign up on the Delta product page and we will be in touch!


While big data applications have become essential to all businesses, they are still too complex to build and slow to deliver. New models such as data lakes and the Lambda architecture have continuously been proposed to simplify data management. With Databricks Delta, we think we have finally made a significant leap towards this goal. Instead of adding new storage systems and data management steps, Delta lets organizations remove complexity by getting the benefits of multiple storage systems in one. By combining the best attributes of existing systems over scalable, low-cost cloud storage, we believe Delta will enable dramatically simpler data architectures that let organizations focus on extracting value from their data.


Try Databricks for free. Get started today.

The post Databricks Delta: A Unified Data Management System for Real-time Big Data appeared first on Databricks.

Read Full Article
Visit website
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

This is a community blog and effort from the engineering team at John Snow Labs, explaining their contribution to an open-source Apache Spark Natural Language Processing (NLP) library. The blog expounds on three top-level technical requirements and considerations for this library.

Apache Spark is a general-purpose cluster computing framework, with native support for distributed SQL, streaming, graph processing, and machine learning. Now, the Spark ecosystem also has an Spark Natural Language Processing library.

Get it on GitHub or begin with the quickstart tutorial.

The John Snow Labs NLP Library is under the Apache 2.0 license, written in Scala with no dependencies on other NLP or ML libraries. It natively extends the Spark ML Pipeline API. The framework provides the concepts of annotators, and comes out of the box with:

  • Tokenizer
  • Normalizer
  • Stemmer
  • Lemmatizer
  • Entity Extractor
  • Date Extractor
  • Part of Speech Tagger
  • Named Entity Recognition
  • Sentence boundary detection
  • Sentiment analysis
  • Spell checker

In addition, given the tight integration with Spark ML, there is a lot more you can use right away when building your NLP pipelines. This includes word embeddings, topic modeling, stop word removal, a variety of feature engineering functions (tf-idf, n-grams, similarity metrics, …) and using NLP annotations as features in machine learning workflows. If you’re not familiar with these terms, this guide to understanding NLP tasks is a good start.

Our virtual team has been building commercial software that heavily depends on natural language understanding for several years now. As such, we have hands-on experience with spaCy, CoreNLP, OpenNLP, Mallet, GATE, Weka, UIMA, nltk, gensim, Negex, word2vec, GloVe, and a few others. We are big fans, and the many places where we’ve imitated these libraries are intended as the sincere form of flattery that they are. But we’ve also banged our heads too many times against their limitations – when we’ve had to deliver scalable, high-performance, high-accuracy software for real production use.

This post describes the advantage of the John Snow Labs NLP library and the use cases for which you should consider it for your own projects.


The first of three top-level requirements we tackled is runtime performance. You’d think this was largely a solved problem with the advent of spaCy and its public benchmarks which reflect a well thought-out and masterfully implemented set of tradeoffs. However, when building Spark applications on top of it, you’d still get unreasonably subpar throughput.

To understand why, consider that an NLP pipeline is always just a part of a bigger data processing pipeline: For example, question answering involves loading training, data, transforming it, applying NLP annotators, building features, training the value extraction models, evaluating the results (train/test split or cross-validation), and hyper-parameter estimation.

Splitting your data processing framework (Spark) from your NLP frameworks means that most of your processing time gets spent serializing and copying strings.

A great parallel is TensorFrames – which greatly improves the performance of running TensorFlow workflows on Spark data frames. This image is credited to Tim Hunter’s excellent TensorFrames overview:

Both Spark and TensorFlow are optimized to the extreme for performance and scale. However, since DataFrames live in the JVM and TensorFlow runs in a Python process, any integration between the two frameworks means that every object has to be serialized, go through inter-process communication (!) in both ways, and copied at least twice in memory. TensorFrames public benchmarks report a 4x speedup by just copying the data within the JVM process (and much more when using GPUs).

We see the same issue when using spaCy with Spark: Spark is highly optimized for loading & transforming data, but running an NLP pipeline requires copying all the data outside the Tungsten optimized format, serializing it, pushing it to a Python process, running the NLP pipeline (this bit is lightning fast), and then re-serializing the results back to the JVM process. This naturally kills any performance benefits you would get from Spark’s caching or execution planner, requires at least twice the memory, and doesn’t improve with scaling. Using CoreNLP eliminates the copying to another process, but still requires copying all text from the data frames and copying the results back in.

So our first order of business is to perform the analysis directly on the optimized data frames, as Spark ML already does (credit: ML Pipelines introduction post by Databricks):


Our second core requirement was frictionless reuse of existing Spark libraries. Part of it is our own pet peeve – why does every NLP library out there have to build its own topic modeling and word embedding implementations? The other part is pragmatic – we’re a small team under tight deadlines and need to make the most of what’s already there.

When we started thinking about a Spark NLP library, we first asked Databricks to point us to whoever is already building one. When the answer came there there isn’t one, the next ask was to help us make sure the design and API of the library fully meet Spark ML’s API guidelines. The result of this collaboration is that the library is a seamless extension of Spark ML, so that for example you can build this kind of pipeline:

val pipeline = new mllib.Pipeline().setStages(Array(

In this code, the document assembler, tokenizer, and stemmer come from the Spark NLP library – the com.jsl.nlp.* package. The TF hasher, IDF and labelDeIndex all come from MLlib’sorg.apache.spark.ml.feature.* package. The dtree stage is a spark.ml.classification.DecisionTreeClassifier. All these stages run within one pipeline that is configurable, serializable and testable in the exact same way. They also run on a data frame without any copying of data (unlike spark-corenlp), enjoying Spark’s signature in-memory optimizations, parallelism and distributed scale out.

What this means is the John Snow Labs NLP library comes with fully distributed, heavily tested and optimized topic modeling, word embedding, n-gram generation, and cosine similarity out of the box. We didn’t have to build them though – they come with Spark.

Most importantly, it means that your NLP and ML pipelines are now unified. The above code sample is typical, in the sense that it’s not “just” an NLP pipeline – NLP is used to generate features which are then used to train a decision tree. This is typical of question answering tasks. A more complex example would also apply named entity recognition, filtered by POS tags and coreference resolution; train a random forest, taking into account both NLP-based features and structured features from other sources; and use grid search for hyper-parameter optimization. Being able to use a unified API pays dividends whenever you need to test, reproduce, serialize or publish such a pipeline – even beyond the performance and reuse benefits.

Enterprise Grade

Our third core requirement is delivering a mission-critical, enterprise-grade NLP library. We make our living building production software. Many of the most popular NLP packages today have academic roots – which shows in design trade-offs that favor ease of prototyping over runtime performance, breadth of options over simple minimalist API’s, and downplaying of scalability, error handling, frugal memory consumption and code reuse.

The John Snow Labs NLP library is written in Scala. It includes Scala and Python APIs for use from Spark. It has no dependency on any other NLP or ML library. For each type of annotator, we do an academic literature review to find the state of the art, have a team discussion and decide which algorithm(s) to implement. Implementations are evaluated on three criteria:

  • Accuracy – there’s no point in a great framework, if it has sub-par algorithms or models.
  • Performance – runtime should be on par or better than any public benchmark. No one should have to give up accuracy because annotators don’t run fast enough to handle a streaming use case, or don’t scale well in a cluster setting.
  • Trainability or Configurability – NLP is an inherently domain-specific problem. Different grammars and vocabularies are used in social media posts vs. academic papers vs. SEC filings vs. electronic medical records vs. newspaper articles.

The library is already in use in enterprise projects – which means that the first level of bugs, refactoring, unexpected bottlenecks and serialization issues have been resolved. Unit test coverage and reference documentation are at a level that made us comfortable to make the code open source.

John Snow Labs is the company leading and sponsoring the development of the Spark NLP library. The company provides commercial support, indemnification and consulting for it. This provides the library with long-term financial backing, a funded active development team, and a growing stream of real-world projects that drives robustness and roadmap prioritization.

Getting involved

If you need NLP for your current project, head to the John Snow Labs NLP for Apache Spark homepage or quickstart guide and give it a try. Prebuilt maven central (Scala) and pip install (Python) versions are available. Send us questions or feedback to nlp@johnsnowlabs.com or via Twitter, LinkedIn or GitHub.

Let us know what functionality you need next. Here are some of the requests we’re getting, and are looking for more feedback to design and prioritize:

  • Provide a SparkR client
  • Provide “Spark-free” Java and Scala versions
  • Add a state of the art annotator for coreference resolution
  • Add a state of the art annotators for polarity detection
  • Add a state of the art annotator for temporal reasoning
  • Publish sample applications for common use cases such as question answering, text summarization or information retrieval
  • Train and publish models for new domains or languages
  • Publish reproducible, peer reviewed accuracy and performance benchmarks

If you’d like to extend or contribute to the library, start by cloning the John Snow Labs NLP for Spark GitHub repository. We use pull requests and GitHub’s issue tracker to manage code changes, bugs and features. The library is still in its early days and we highly appreciate contribution and feedback of any kind.


Alex Thomas from Indeed and David Talby from Pacific AI for the initial design and code of this library.

Saif Addin Ellafi from John Snow Labs for building the first client-ready version of the library.

Eduardo Munoz, Navneet Behl and Anju Aggarwal from John Snow Labs for expanding the production grade codebase and functionality, and for managing the project.

Joseph Bradley and Xiangrui Meng from Databricks, for guidance on the Spark ML API extension guidelines.

Claudiu Branzan from G2 Web Services, for design contributions and review.

Ben Lorica from O’Reilly, for driving me to move this from idea to reality.


Try Databricks for free. Get started today.

The post Introducing the Natural Language Processing Library for Apache Spark appeared first on Databricks.

Read Full Article
Visit website
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

This is a guest post from Matt Hogan, Sr. Director of Engineering, Analytics and Reporting at McGraw-Hill Education.

McGraw-Hill Education is a 129-year-old company that was reborn with the mission of accelerating learning through intuitive and engaging experiences – grounded in research. When we began our journey, our data was locked in silos managed within teams and was secure, but not shared and not fully leveraged. The research needed to create machine learning models that could make predictions and recommendations to students and instructors was hard because the paths to the data needed to train those models were often unclear, slow, and sometimes completely closed off. To be successful, we first needed to change our organizational approach to burst through the silos that were preventing us from unlocking the full potential of shared, consumable information. We needed a culture of data. More precisely, a culture of secure data that protected the privacy of students and institutions that worked with us but allowed enough safe access to stoke intellectual curiosity and innovation across the organization. To guide our research, we founded the McGraw-Hill Education Learning Science Research Council (LSRC). The LSRC is chartered with guiding our own research as well as working with the education and technology communities to drive the field of learning science research forward.

Our transformation is underscored by our deep understanding of learning, including knowledge of how students and instructors leverage both content and learning management solutions. Through exploring how content and learning management systems are used coupled with understanding a student’s progress through material, we’re able to leverage fully anonymized data to drive adaptive models to engage with students and deliver differentiated instruction to drive better outcomes.  Aggregating, processing, developing models, and operationalizing those models as features in learning management systems requires a cohesive data engineering and data science platform, and Databricks has partnered with us to fill that engineering need. We have made some incredible strides forward by coupling the technological edge gained from Databricks with a concerted organizational shift to align security, DevOps, platform, and product teams around a new data-centric paradigm of operating that allows interdisciplinary teams to cross-cut through data silos and unlock insights that would have otherwise not been possible.

Democratizing Data and Machine Learning with Databricks

To test the efficacy of insights generated from learning science research, we first needed to democratize access to data and machine learning thereby reducing the cost of iterating on models and testing their effect in the market and on learners. Databricks provides us with a ubiquitous data environment that can be leveraged by business analysts, data engineers, and data scientists to work collaboratively toward a set of common goals.

Before bringing Databricks on, we had many Apache Spark clusters running on leased hardware. To get data to researchers, we created one-off extracts that would pull data from one of our product databases, scrub Personally Identifiable Information (PII), and store the data as flat files on one of the leased Spark clusters. This led to many problems including data quickly becoming stale, researchers working on models that were out of sync with the data in the extracted flat files, and no clear way to integrate the output of models developed by the data scientists.

After adopting Databricks, we could create secure connections between read-replicas of our learning systems and our data platform, ensuring that data was encrypted both in motion and at rest while allowing us to protect the confidentiality of learners. The connections allow researchers to have access to fresh data and current data models in a secure environment. Because the Spark systems are used by both engineering teams and researchers, output from the researchers could be leveraged to create new models and populate our data mart with new insights. As an added benefit, we could provide access to the business analytics teams who are now able to conduct their own research along with the data scientists on a single, unified analytics platform.

Iterating on the Connect Attrition Model

One of our first efforts catalyzed by Databricks was our Connect student attrition model – a classifier that can predict which students are at risk for dropping a course based on progress working through the material and outcomes on quizzes and assignments – within 2-4 weeks of a class starting. The McGraw-Hill Education Connect Platform is a highly reliable, secure learning management solution that covers homework, assessments, and course content delivery and does so leveraging many adaptive tools to improve student outcomes. While Connect appears to the world as a single, seamless platform and experience, it is made up of several pluggable technologies and products all generating their own data and metrics in real time. These components are hosted through a mix of in-house data centers and Amazon Web Services, making data aggregation, cleaning, and research a challenge.

To facilitate the data science work needed to train the model, we undertook several initiatives:

  • Created secure access to read-replicas of the Connect OLTP system
  • Created a new platform designed to facilitate rapid iteration and evaluation of models coupled with user experience and impact
  • Worked to change McGraw-Hill Education’s culture from one where data silos were the norm to one in which current, secure, anonymized data is made accessible to those who have a valid use for it
  • Designed a system of leveraging encrypted S3 buckets (with Parquet in some cases) that researchers and developers could use to securely and easily move, evaluate, and read (using SparkSQL) data

To establish a cycle of rapid feedback with researchers in the academic community as well as a select number of instructors, we created a platform called the Learning Science Dashboard (LSD). The LSD leverages our single sign-on solution to provide access to visualizations that researchers create using Databricks through reading from and writing to S3 buckets. Once the first level of evaluation was completed, we engaged with designers and engineers to build an operational notebook that could use the connector to query data from live classes in Connect and write predictions to a staging area in S3. Using the Databricks job scheduler made the operationalization of the model a trivial task. A new UI was deployed (this time using AngularJS) into the LSD and select instructors with access to Connect could log in and see an evaluation version of the report to gain insights on how their classes are performing.

Deploying and Market Testing the Model

The initial visualizations created by the data scientists conveyed important meaning, but were not designed to be consumed by instructors focused on quickly gaining insights on class performance. The first round of visualizations that we tested were created by data scientists in Databricks and exported to S3 buckets with controlled access through the LSD.

The second round of iterations included engineering teams and designers and were designed to quickly convey insights – allowing instructors to focus interventions where they were needed.

Thoughts in Closing

Using Databricks, we could securely transform ourselves from a collection of data silos with limited access to data and minimal collaboration to an organization with democratized access to data and machine learning with data engineers, data scientists, business analysts, and front-end engineers all working in the same space and sharing ideas. By building out some complimentary technologies and processes, we reduced the time to market and created a path by which we could quickly and easily validate both models and insights with external researchers and instructors in the classroom.

Furthermore, our legacy data pipelines (ETL) were decommissioned, resulting in a significant cost savings and simplification of how we run our infrastructure. In the past, we had provisioned EC2 instances running all the time – even when data was not moving. We also had leased hardware running large but statically sized Spark clusters at a cost of thousands per month. Through leveraging Databricks’ dynamic provisioning of instances/clusters when needed coupled with the notion of just-in-time data warehousing, we were able to reduce operational cost of our analytics infrastructure by 30%.


Try Databricks for free. Get started today.

The post Using Databricks to Democratize Big Data and Machine Learning at McGraw-Hill Education appeared first on Databricks.

Read Full Article
Visit website
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Adding new software to an enterprise is a difficult process.  In the past, choosing new software only required budget approval before it could be adopted.  Today’s enterprises have adopted processes that require security approval before a new product is approved for purchase.  On the surface, it seems like more bureaucracy, but software has changed. Software is becoming more interconnected and companies are adopting more cloud-based platforms.  The normal methodology to keep software internal no longer applies to cloud platforms.  Security is no longer localized to a corporate infrastructure.  Instead, companies are deeply reliant on the vendor’s security practices to protect their data.  If the company’s vendor has weak security, it exposes the company to additional risk.  The distribution of security responsibilities requires companies to scrutinize their vendor’s security and forces their vendors to meet their own corporate security controls before they allow the usage of new products.

CISO organizations are typically seen as conservative and risk averse due to the rigor they use to evaluate new products. Sometimes the process takes many months — answering questionnaires, security meeting, and running various penetration tests. In my experience of being on both sides of the fence — buyer of enterprise software products and seller of enterprise software — I believe CISOs are evaluating at three critical dimensions of your security program:

  1. Trust
  2. Technology
  3. Transparency
1. Trust

It requires a company to have collateral and a strong relationship with the vendor’s security team.  Trust is not immediately earned.  Customers trust leaders who have experience working with enterprise security teams.  Leaders who have contributed to the security community, either by publications or participation in the security community, immediately earn more trust from customers.  Other areas where customers gain more trust is with the maturity of the company’s security program.   A mature security program will have good security collateral and a long history of certifications.

The collateral should have a detailed description of the product.  Not just the highlights in a white paper with security keywords, but comprehensive white papers that explain the security methodology, security architecture, and the product’s security controls. This document must be catered to a security professional in specific domains.  For example, there should be an application security section that discusses the vendor’s secure software development lifecycle and ensures the product addresses the OWASP top vulnerabilities.  Data handling must be defined from the client to where it is stored in the service.  The documentation is the first component required to gain customer trust.  The vendor must share third-party audit reports.

Vendors must have some third-party attestations to confirm security controls are in place.  The vendor can say they are performing security, but a third party audit report validates that the vendor has the controls in place. Here are some types of control validation: ISO 27001 certification or SOC2 audit attestation.  If there are some regulatory requirements (i.e. GDPR, HIPAA, GLBA), these must be addressed in forms of an additional audit report focused on specific controls. In some cases, these controls are incorporated in ISO or SOC2 controls.  At a minimum, vendors must have an audit report that contains audit data showing more than six months of control data.  A SOC2 Type 2 report is an example of an acceptable report.

2. Technology

Choosing technology requires a deep understanding of the platform.  Just choosing a product simply to meet a compliance requirement doesn’t necessarily protect a platform.  It is important to consider the environment before choosing the tools to monitor and protect it.  An example is purchasing a traditional IDS that worked well in the data center but will not be effective in a public cloud platform like Amazon Web Services (AWS).  In AWS there is no way to get complete access to the low-level network traffic using traditional Intrusion Detection System (IDS) detection methods.  Instead, an AWS specific service that has access to the AWS control plane is more effective.  Only an AWS native platform will work be able to have visibility to anomalies across the infrastructure.

Infrastructure monitoring is a requirement for effective protection.  Monitoring must be in place that watches the network, systems, and logs.  All the data should be delivered to a protected centralized platform designed to correlate these actions and is tuned to alert on specific types of events.  A properly configured system will be tuned for normal behavior and anything outside normal behavior will alert the security team.

Using logical segregation is an effective way to ensure data isolation.  Data isolation protects data from being shared with another customer preventing accidental exposure.  To properly isolate customers, vendors can configure a separate environment for each customer where their data is stored.  In AWS, a VPC can be configured per customer.  Any connection to VPCs can be accomplished by either VPC peering, VPN, or a Direct Connect.  Having multiple options is ideal, because one type of connection may not meet some company’s security requirements.

Encryption must be leveraged across the platform, especially in a public cloud environment.  In a public cloud environment, there are many shared resources and encryption is the only way that you can adequately protect important data from being exposed.  This applies to data in transit and data that is stored.  Data in transit must be protected by an encrypted channel like TLS.  This applies to both the connection from the client to the service and connections between services inside the infrastructure.

Data stored in a public cloud must be encrypted at a minimum using a file system level encryption.  In AWS, a Key Management System (KMS) is used for file level encryption. KMS has the flexibility to have the encryption key owned by the customer or by the service provider.  The advantage of allowing the customer to control their key is that they can have the confidence if they delete the key, that their data is not retrievable — especially if the storage is repurposed for another customer.

3. Transparency

Telling a customer that their data is safe without sharing how it is protected quickly deters a customer.  Sharing the methodology will give them the assurance that their data is protected.  A mature security program should run under an information security management system (ISMS).  This system should clearly define the way security is practiced in the company.  The ISMS should have provisions to share information with the customer in forms of audit reports and scan reports.  Here is what should be shared:

Penetration test results — Making third-party penetration test results available to the customer will gain the trust of most corporate security teams.  Third-party penetration tests must be performed more than once a year.  In addition to sharing a vendor-sponsored third-party penetration test, the customer should perform tests on the vendor’s platform with the agreed-upon scope.

Dynamic and static code scans — Vendors must perform dynamic and static code scans on their product on a regular cadence using an industry-recognized tool.  These reports should be shared no less than semi-annually.

Vulnerability scan results — Frequent network and system scans should be made available to customers.  These scans should be performed no less than quarterly.

Architecture details — There must be details to the environment shared with the customers. The details should be high-level to where the customer can understand how the product is put together.  This includes high-level diagrams of the services and physical and logical divisions.

Data flow and persistence —  Share how the customer data flows through the vendor’s infrastructure.  The documentation must detail where, what, and how data is stored.

Access — The vendor must provide clear documentation and visibility to who has access to their data and the safeguards to prevent any unauthorized users from accessing company data.

Building a Security Program Around the 3Ts — Trust, Technology, and Transparency allows technology companies to partner with CISOs to extend their security model and protect their customers.


Try Databricks for free. Get started today.

The post 3 Things CISO’s expect from Tech Companies in a Cloudy World appeared first on Databricks.

Read Full Article
Visit website
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

This is the seventh post in a multi-part series about how you can perform complex streaming analytics using Apache Spark and Structured Streaming.


Most data streams, though continuous in flow, have discrete events within streams, each marked by a timestamp when an event transpired. As a consequence, this idea of “event-time” is central to how Structured Streaming APIs are fashioned for event-time processing—and the functionality they offer to process these discrete events.

Event-time basics and event-time processing are adequately covered in Structured Streaming documentation and our anthology of technical assets on Structure Streaming. So for brevity, we won’t cover them here. Built on the concepts developed (and tested at scale) in event-time processing, such as sliding windows, tumbling windows, and watermarking, this blog will focus on two topics:

  1. How to handle duplicates in your event streams
  2. How to handle arbitrary or custom stateful processing
Dropping Duplicates

No streaming events are free of duplicate entries. Dropping duplicate entries in record-at-a-time systems is imperative—and often a cumbersome operation for a couple of reasons. First, you’ll have to process small or large batches of records at time to discard them. Second, some events, because of network high latencies, may arrive out-of-order or late, which may force you to reiterate or repeat the process. How do you account for that?

Structured Streaming, which ensures exactly once-semantics, can drop duplicate messages as they come in based on arbitrary keys. To deduplicate data, Spark will maintain a number of user-specified keys and ensure that duplicates, when encountered, are discarded.

Just as other stateful processing APIs in Structured Streaming are bounded by declaring watermarking for late data semantics, so is dropping duplicates. Without watermarking, the maintained state can grow infinitely over the course of your stream.

The API to instruct Structured Streaming to drop duplicates is as simple as all other APIs we have shown so far in our blogs and documentation. Using the API, you can declare arbitrarily columns on which to drop duplicates—for example, user_id and timestamp. An entry with same timestamp and user_id is marked as duplicate and dropped, but the same entry with two different timestamps is not.
Let’s see an example how we can use the simple API to drop duplicates.

import org.apache.spark.sql.functions.expr

  .withWatermark("event_time", "5 seconds")
  .dropDuplicates("User", "event_time")
from pyspark.sql.functions import expr

  .withWatermark("event_time", "5 seconds")\
  .dropDuplicates(["User", "event_time"])\

Over the course of the query, if you were to issue a SQL query, you will get an accurate results, with all duplicates dropped.

SELECT * FROM deduplicated
|   a| 8085|
|   b| 9123|
|   c| 7715|
|   g| 9167|
|   h| 7733|
|   e| 9891|
|   f| 9206|
|   d| 8124|
|   i| 9255|

Next, we will expand on how to implement a customized stateful processing using two Structured Streaming APIs.

Working with Arbitrary or Custom Stateful Processing

Not all event-time based processing is equal or as simple as aggregating a specific data column within an event. Others events are more complex; they require processing by rows of events ascribed to a group; and they only make sense when processed in their entirety by emitting either a single result or multiple rows of results, depending on your use cases.

Consider these use-cases where arbitrary or customized stateful processing become imperative:
1. We want to emit an alert based on a group or type of events if we observe that they exceed a threshold over time
2. We want to maintain user sessions, over definite or indefinite time and persist those sessions for post analysis.

All of the above scenarios require customized processing. Structured Streaming APIs offer a set of APIs to handle these cases: mapGroupsWithState and flatMapGroupsWithState. mapGroupsWithState can operate on groups and output only a single result row for each group, whereas flatMapGroupsWithState can emit a single row or multiple rows of results per group.

Timeouts and State

One thing to note is that because we manage the state of the group based on user-defined concepts, as expressed above for the use-cases, the semantics of watermark (expiring or discarding an event) may not always apply here. Instead, we have to specify an appropriate timeout ourselves. Timeout dictates how long we should wait before timing out some intermediate state.

Timeouts can either be based on processing time (GroupStateTimeout.ProcessingTimeTimeout) or event time (GroupStateTimeout.EventTimeTimeout). When using timeouts, you can check for timeout first before processing the values by checking the flag state.hasTimedOut.

To set processing timeout, use GroupState.setTimeoutDuration(...) method. That means the timeout guarantee will occur under the following conditions:

  • Timeout will never occur before the clock has advanced X ms specified in the method
  • Timeout will eventually occur when there is a trigger in the query, after X ms

To set event time timeout, use GroupState.setTimeoutTimestamp(...). Only for timeouts based on event time must you specify watermark. As such all events in the group older than watermark will be filtered out, and the timeout will occur when the watermark has advanced beyond the set timestamp.

When timeouts occur, your function supplied in the streaming query will be invoked with arguments: the key by which you keep the state; an iterator rows of input, and an old state. The example with mapGroupsWithState below defines a number of functional classes and objects used.

Example with mapGroupsWithState

Let’s take a simple example where we want to find out when (timestamp) a user performed his or her first and last activity in a given dataset in a stream. In this case, we will group on (or map on) on a user key and activity key combination.

But first, mapGroupsWithState requires a number of functional classes and objects:
1. Three class definitions: an input definition, a state definition, and optionally an output definition.
2. An update function based on a key, an iterator of events, and a previous state.
3. A timeout parameter as described above.

So let’s define our input, output, and state data structure definitions.

case class InputRow(user:String, timestamp:java.sql.Timestamp, activity:String)
case class UserState(user:String,
  var activity:String,
  var start:java.sql.Timestamp,
  var end:java.sql.Timestamp)

Based on a given input row, we define our update function

def updateUserStateWithEvent(state:UserState, input:InputRow):UserState = {
// no timestamp, just ignore it  
if (Option(input.timestamp).isEmpty) {
    return state
//does the activity match for the input row
if (state.activity == input.activity) {
    if (input.timestamp.after(state.end)) {
      state.end = input.timestamp
    if (input.timestamp.before(state.start)) {
      state.start = input.timestamp
  } else { 
   //some other activity
    if (input.timestamp.after(state.end)) {
      state.start = input.timestamp
      state.end = input.timestamp
      state.activity = input.activity
  //return the updated state

And finally, we write our function that defines the way state is updated based on an epoch of rows.

import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, GroupState}

def updateAcrossEvents(user:String,
    inputs: Iterator[InputRow],
     oldState: GroupState[UserState]):UserState = {
     var state:UserState = if (oldState.exists) oldState.get else UserState(user,
        new java.sql.Timestamp(6284160000000L),
        new java.sql.Timestamp(6284160L)
  // we simply specify an old date that we can compare against and
  // immediately update based on the values in our data

  for (input <- inputs) {
    state = updateUserStateWithEvent(state, input)

With these pieces in place, we can now use them in our query. As discussed above, we have to specify our timeout so that the method can timeout a given group’s state and we can control what should be done with the state when no update is received after a timeout. For this illustration, we will maintain state indefinitely.

import org.apache.spark.sql.streaming.GroupStateTimeout

  .selectExpr("User as user", "cast(Creation_Time/1000000000 as timestamp) as timestamp", "gt as activity")
  // group the state by user key

We can now query our results in the stream:

SELECT * FROM events_per_window order by user, start

And our sample result that shows user activity for the first and last time stamp:

|user|activity|               start|                 end|
|   a|    bike|2015-02-23 13:30:...|2015-02-23 14:06:...|
|   a|    bike|2015-02-23 13:30:...|2015-02-23 14:06:...|
|   b|    bike|2015-02-24 14:01:...|2015-02-24 14:38:...|
|   b|    bike|2015-02-24 14:01:...|2015-02-24 14:38:...|
|   c|    bike|2015-02-23 12:40:...|2015-02-23 13:15:...|
|   d|    bike|2015-02-24 13:07:...|2015-02-24 13:42:...|
What’s Next

In this blog, we expanded on two additional functionalities and APIs for advanced streaming analytics. The first allows removing duplicates bounded by a watermark. With the second, you can implement customized stateful aggregations, beyond event-time basics and event-time processing.

Through an example using mapGroupsWithState APIs, we demonstrated how you can implement your customized stateful aggregation for events whose processing semantics can be defined not only by timeout but also by user semantics and business logic.

Our next blog in this series, we will explore advanced aspects of flatMapGroupsWithState use cases, as will be discussed at the Spark Summit EU, in Dublin, in a deep dive session on Structured Streaming.

Read More

Over the course of Structured Streaming development and release since Apache Spark 2.0, we have compiled a comprehensive compendium of technical assets, including our Structured Series blogs. You can read the relevant assets here:

Try Apache Spark’s Structured Streaming latest APIs on Databricks’ Unified Analytics Platform.


Try Databricks for free. Get started today.

The post Arbitrary Stateful Processing in Apache Spark’s Structured Streaming appeared first on Databricks.

Read Full Article
Visit website
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 
Databricks by Eiti Kimura And Flavio Clésio - 4M ago

This is a guest post from Movile. Eiti Kimura and Flavio Clésio share their highlights and what they’re looking forward to the most at Spark Summit EU in Dublin, Ireland.

About the Authors:
Eiti Kimura has over 15 years of experience working with software development. He holds a Master’s Degree in Electrical Engineering. He has vast experience with back-end systems for carrier billing services. He currently works as IT Coordinator at Movile Brazil.

Flavio Clesio is a specialist in machine learning and revenue assurance at Movile, where he helps build core intelligent applications to maximize revenue opportunities and automation in decision making. He holds a Master’s degree in computational intelligence applied in financial markets. Currently Working as Head of Machine Learning at Movile Brazil.

Spark Summit is nearly here and we are excited to cross the ocean and head to Dublin in Ireland. It is a new world for us, tropical country guys – new weather (not so tropical), city, conference, people, and culture. It promises to be a very rich experience.

Networking with the community and attending  presentations to learn new information and ideas is always the best part of going to a conference.


Here are some of the talks that we are looking forward to attending:

At Movile, as we’re taking major steps to migrate our ETL processes from traditional RDBMS to Apache Spark, we’ll be learning from Brandon Carl at Facebook who will provide a presentation that grabbed our attention called “Lessons learned developing and managing massive (300 TB+) Apache Spark pipelines in production”.

We’re living an era where hardware is cheaper than ever before, and the right question is not about whether we can get hardware to deliver good machine learning models, but what is the best model to put in production to deliver the business. For that reason, we’ll attend the session by Marcin Kulka and Michał Kaczmarczyk from 9LivesData called “No More Cumbersomeness: Automatic Predictive Modeling on Apache Spark”.

In our opinion Ramya Raghavendra from IBM’s talk called “Improving Traffic Prediction Using Weather Data” will teach us how to solve a really hard problem using Apache Spark as we pivot from the engineering and Machine Learning perspective.

At Movile we’re dealing with several projects using Apache Spark and Redis to solve a myriad of problems, and we believe that Dvir Volk from Redis Labs will provide us with some interesting knowledge about how to put together the best setup in their talk called “Getting Ready to use Redis with Apache Spark”.

For everyone that is interested in streaming (and who isn’t these days?) the presentation by Tathagata Das from Databricks called “Easy, Scalable, Fault-Tolerant stream processing with structured streaming in Apache Spark” is  an excellent choice. He will show us how to read Kafka and parse JSON payloads in less than 10 lines.

Have you ever faced a situation with poor performance? We have! And that is why this presentation caught our attention. Luca Canali will talk about the methods and tools for troubleshooting Spark workloads at scale: “Apache Spark Performance Troubleshooting At Scale, Challenges, Tools, And Methodologies“.

We hope you liked our highlights. And last but not least, Flavio and I will be talking about how we have saved more than $3 million US  in revenue leakages with an intelligent monitoring solution with machine learning using Apache Spark MLlib: “Preventing Revenue Leakage And Monitoring Distributed Systems With Machine Learning“.

We hope to see you in Dublin!


Try Databricks for free. Get started today.

The post Crossing The Ocean for Spark Summit EU appeared first on Databricks.

Read Full Article
Visit website
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 


This year’s Spark Summit Europe is poised to be the biggest yet and we here at Databricks could not be more excited to be hosting this event. The level of participation and continued growth of the summit is a clear indicator of how critical Apache Spark has become for many organizations all over Europe. Just since 2015 we’ve seen the number of companies represented at Spark Summit almost double to over 2,100 with more than 6,000 attendees registered which include developers, engineers, data scientists, researchers and business professionals, all there for three days of in-depth learning and networking.

Much of this growth can be attributed to the incredible increase in the number of businesses leveraging Spark and Databricks for advanced analytics to support initiatives in machine learning, AI, and more — rapidly accelerating innovation and allowing these organizations to realize transformative business outcomes.

For anyone involved with Big Data and Data Science in Europe, this summit should be viewed as an incredible opportunity to learn more about Spark from thought leaders in both disciplines. The three-day summit includes a dedicated training day with tracks focused on Data Science with Spark, Tuning best practices, and Deep learning with Keras, Tensorflow, and Spark. You can check out the agenda here and start planning which tracks to attend. The following two days of the conference offer over 100 different sessions and dedicated Developer, Data Science, Enterprise, AI and Streaming tracks. There will be relevant and engaging content for every level and role within the community from beginner to advanced.

The keynote presenters are true innovators in their fields and offer an amazing level of insight into Spark adoption and success at their companies, including:

  • Hotels.com Chief Data Science Office Matt Fryer will discuss how Databricks has helped Hotels.com use machine learning to not only deliver a more personalized and engaging customer experience, but also enable agile and impactful business decision making.
  • Daniel Jeavons, General Manager of Advanced Analytics for Shell Research shares how Shell is improving supply chain efficiencies and inventory management globally thanks to Databricks.
  • Leah McGuire, a data scientist and principal team member from Salesforce, will be speaking on leveraging Spark in the development of platforms to enable the integration of machine learning into Salesforce products.

This is an incredible time to be working with advanced analytics and Databricks is both humbled and proud to be able to host Spark Summit Europe, support the Spark community, and help further the field of data science. We look forward to seeing you at the summit! Use code “DATABRICKS” and save 15% off your registration fee*.

*Promotional codes must be used at the original time of purchase.


Try Databricks for free. Get started today.

The post The Biggest EU Summit Ever: Over One Hundred Presentations, Two Conference Days, and a Day of Training appeared first on Databricks.

Read Full Article
Visit website

Read for later

Articles marked as Favorite are saved for later viewing.
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Separate tags by commas
To access this feature, please upgrade your account.
Start your free year
Free Preview