Loading...

Follow Rittman Mead on Feedspot

Continue with Google
Continue with Facebook
or

Valid

Hi, Game of Thrones aficionados, welcome to GoT Series 8 and my tweet analysis! If you missed any of the prior season episodes, here are I, II and III. Finally, after almost two years, we have a new series and something interesting to write about! If you didn't watch Episode 1, do it before reading this post as it might contain spoilers!

Let's now start with a preview of the starting scene of Episode 2:

If you followed the previous season blog posts you may remember that I was using Kafka Connect to source data from Twitter, doing some transformations with KSQL and then landing the data in BigQuery using Connect again. On top of it, I was using Tableau to analyze the data.

The above infrastructure was working fine and I have been able to provide insights like the sentiment per character and the "game of couples" analysing how a second character mentioned in the same tweet could change the overall sentiment.

The sentiment scoring was however done at visualization time, with the data extracted from BigQuery into Tableau at tweet level, scored with an external call to R, then aggregated and finally rendered.

As you might understand the solution was far from optimal since:

  • The Sentiment scoring was executed for every query sent to the database, so possibly multiple times per dashboard
  • The data was extracted from the source at tweet level, rather than aggregated

The dashboard indeed was slow to render and the related memory consumption huge (think about data volumes being moved around). Furthermore, Sentiment Scores were living only inside Tableau: if any other people/application/visualization tool wanted to use them, they had to recalculate from scratch.

My question was then: where should I calculate Sentiment Scores in order to:

  • Do it only once per tweet, not for every visualization
  • Provide them to all the downstream applications

The answer is simple, I need to do it as close to the source as possible: in Apache Kafka!

Sentiment Scoring in Apache Kafka

There are a gazillion different ways to implement Sentiment Scoring in Kafka, so I chose a simple method based on Python and Google's Natural Language API.

Google Natural Language API

Google's NL APIs is a simple interface over a pre-trained Machine Learning model for language Analysis and as part of the service it provides sentiment scoring.

The Python implementation is pretty simple, you just need to import the correct packages

from google.cloud import language_v1
from google.cloud.language_v1 import enums

Instantiate the LanguageServiceClient

client = language_v1.LanguageServiceClient()

Package the tweet string you want to be evaluated in a Python dictionary

content = 'I'm Happy, #GoT is finally back!'
type_ = enums.Document.Type.PLAIN_TEXT
document = {'type': type_, 'content': content}

And parse the response

response = client.analyze_sentiment(document)
sentiment = response.document_sentiment
print('Score: {}'.format(sentiment.score))
print('Magnitude: {}'.format(sentiment.magnitude))

The result is composed by Sentiment Score and Magnitude:

  • Score indicated the emotion associated with the content as Positive (Value > 0) or Negative (Value < 0)
  • Magnitude indicates the power of such emotion, and is often proportional with the content length.

Please note that Google's Natural Language API is priced per document so the more content you send for scoring, the bigger your bill will be!

Creating a Kafka Consumer/Producer in Python

Once we fixed how to do Sentiment Scoring, it's time to analyze how we can extract a tweet from Kafka in Python. Unfortunately, there is no Kafka Streams implementation in Python at the moment, so I created an Avro Consumer/Producer based on Confluent Python Client for Apache Kafka. I used the jcustenborder/kafka-connect-twitter Connect, so it's always handy to have the Schema definition around when prototyping.

Avro Consumer

The implementation of an Avro Consumer is pretty simple: as always first importing the packages

from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError

then instantiating the AvroConsumer passing the list of brokers, group.id useful, as we'll see later, to add multiple consumers to the same topic, and the location of the schema registry service in schema.registry.url.

c = AvroConsumer({
    'bootstrap.servers': 'mybroker,mybroker2',
    'group.id': 'groupid',
    'schema.registry.url': 'http://127.0.0.1:8081'})

Next step is to subscribe to a topic, in my case got_avro

c.subscribe(['got_avro'])

and start polling the messages in loop

while True:
    try:
        msg = c.poll(10)

    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break

    print(msg.value())

c.close()

In my case, the message was returned as JSON and I could extract the tweet Text and Id using the json package

text=json.dumps(msg.value().get('TEXT'))
id=int(json.dumps(msg.value().get('ID')))

Avro Producer

The Avro Producer follows a similar set of steps, first including needed packages

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

Then we define the Avro Key and Value Schemas, in my case I used the tweet Id as key and included the text in the value together with the sentiment score and magnitude.

key_schema_str = """
{
   "namespace": "my.test",
   "name": "value",
   "type": "record",
   "fields" : [
     {
       "name" : "id",
       "type" : "long"
     }
   ]
}
"""
value_schema_str = """
{
   "namespace": "my.test",
   "name": "key",
   "type": "record",
   "fields" : [
     {
       "name" : "id",
       "type" : "long"
     },
     {
       "name" : "text",
       "type" : "string"
     },
     {
       "name" : "sentimentscore",
       "type" : "float"
     },
     {
       "name" : "sentimentmagnitude",
       "type" : "float"
     }
   ]
}
"""

Then it's time to load the Key and the Value

value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
key = {"id": id}
value = {"id": id, "text": text,"sentimentscore": score ,"sentimentmagnitude": magnitude}

Creating the instance of the AvroProducer passing the broker(s), the schema registry URL and the Key and Value schemas as parameters

avroProducer = AvroProducer({
    'bootstrap.servers': 'mybroker,mybroker2',
    'schema.registry.url': 'http://schem_registry_host:port'
    }, default_key_schema=key_schema, default_value_schema=value_schema)

And finally produce the event defining as well the topic that will contain it, in my case got_avro_sentiment.

avroProducer.produce(topic='got_avro_sentiment', value=value, key=key)
avroProducer.flush()

The overall Producer/Consumer flow is needless to say, very easy

And it works!

Parallel Sentiment Scoring

One thing I started noticing immediately, however, is that especially on tweeting peaks, the scoring routine couldn't cope with the pace of the incoming tweets: a single python Consumer/Producer was not enough. No problem! With Kafka, you can add multiple consumers to the same topic, right?

Of course Yes! But you need to be careful.

Consumer Groups and Topic Partitions

You could create multiple consumers on different Consumer Groups (defined by the group.id parameter mentioned above), but by doing this you're telling Kafka that those consumers are completely independent, thus Kafka will send each one a copy of every message. In our case, we'll simply end up scoring N times the same message, one for each consumer.

If, on the other hand, you create multiple consumers with the same consumer group, Kafka will treat them as unique consuming process and will try to share the load amongst them. However, it will do so only if the source topic is partitioned and will exclusively associate each consumer to one (or more) topic partitions! To read more about this check the Confluent documentation.

The second option is what we're looking for, having multiple threads reading from the same topic and splitting the tweet workload, but how do we split an existing topic into partitions? Here is where KSQL is handy! If you don't know about KSQL, read this post!

With KSQL we can define a new STREAM sourcing from an existing TOPIC or STREAM and the related number of partitions and partition key (the key's hash will be used to assign deterministically a message to a partition). The code is the following

CREATE STREAM <NEW_STREAM_NAME> 
    WITH (PARTITIONS=<NUMBER_PARTITIONS>) 
    AS SELECT <COLUMNS> 
    FROM <EXISTING_STREAM_NAME>  
    PARTITION BY <PARTITION_KEY>;

Few things to keep in mind:

  • Choose the number of partitions carefully, the more partitions for the same topic, the more throughput but at the cost of extra complexity.
  • Choose the <PARTITION_KEY> carefully: if you have 10 partitions but only 3 distinct Keys, then 7 partitions will not be used. If you have 10 distinct keys but 99% of the messages have just 1 key, you'll end up using almost always the same partition.

Yeah! We can now create one consumer per partition within the same Consumer Group!

Joining the Streams

As the outcome of our process so far we have:

  • The native GOT_AVRO Stream coming from Kafka Connect, which we divided into 6 partitions using the tweet id as Key and named GOT_AVRO_PARTITIONED.
  • A GOT_AVRO_SENTIMENT Stream that we created using Python and Google's Natural Language API, with id as Key.

The next logical step would be to join them, which is possible with KSQL by including the WITHIN clause specifying the temporal validity of the join. The statement is, as expected, the following:

SELECT A.ID, B.ID, A.TEXT, B.SENTIMENTSCORE, B.SENTIMENTMAGNITUDE 
FROM GOT_AVRO_PARTITIONED A JOIN GOT_AVRO_SENTIMENT B 
    WITHIN 2 MINUTES 
    ON A.ID=B.ID; 

Please note that I left a two minute window to take into account some delay in the scoring process. And as you would expect I get............ 0 results!

Reading the documentation better gave me the answer: Input data must be co-partitioned in order to ensure that records having the same key on both sides of the join are delivered to the same stream task.

Since the GOT_AVRO_PARTITIONED stream had 6 partitions and GOT_AVRO_SENTIMENT only one, the join wasn't working. So let's create a 6-partitioned version of GOT_AVRO_SENTIMENT.

CREATE STREAM GOT_AVRO_SENTIMENT_PARTITIONED 
	WITH (PARTITIONS=6) AS 
	SELECT ID, 
		TEXT, 
		SENTIMENTSCORE, 
		SENTIMENTMAGNITUDE 
	FROM GOT_AVRO_SENTIMENT  
	PARTITION BY ID;

Now the join actually works!

Next topics are: pushdown to Google's BigQuery and visualization using Google's Data Studio! But this, sadly, will be for another post! See you soon, and enjoy Game of Thrones!

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Oracle Analytics Cloud as an enabler for Data Science: this is the third post of a series which started with Episode I where we discussed the path from a Data Analyst to a Data Scientist and how to start a Data Science journey with OAC. Episode II was focused on Feature Engineering, Data Analytics and Machine Learning, showing how those steps can be performed in OAC using a visual and easily understandable interface.

This last post will focus on how to perform Predictions in OAC and lastly, it will analyze how the Data Science option provided by OAC can fit in a corporate Data Science Strategy.

Step #6: Predict

In the second part of the blog series, we understood how to train a model and evaluate the scoring predictions via OAC. It's important to note that the various models we created are stored as independent objects in OAC: they will not be changed by modification or deletion of the DataFlow that originated them. We can set the sharing permissions directly in the Model Inspect window.

Note: It's worth mentioning that models are only identified by Model Name so multiple DataFlows generating models with the same name will be overwriting the same object.

As for model training, also for Prediction OAC provides two methods: On the Fly or via DataFlow. To create a project using On The Fly method, do the following:

  • Select Create Scenario
  • Select an ML Model from the list
  • Associate the columns used for the Model training with columns coming from the dataset
  • Start using the predicted fields in one (or more) visualization

The On the Fly method works well and can add multiple scenarios at the same time, however, it has two disadvantages:

  • The prediction is done at run time: the Python code behind the model is executed for every analysis and after every refresh, which could make the visualization time consuming and resource hungry to deliver.
  • It's difficult to create new formulas using predictions and source dataset: providing a prediction is only part of the game, as Analysts, we may want to create complex calculations dependent on both columns coming from the predictions and the original dataset. This is possible (the prediction is done via an EVALUATE_SCRIPT which can be found in the Developer view) but it's very hard and error prone since the columns coming from the evaluation are not provided in the Project's code editor for new columns.

The second point mentioned is not an actual disadvantage of the method itself and mainly related to the editor implementation which I hope it will be fixed in a future release.

Both the above disadvantages are addressed by the "Batch" mode using DataFlow. In DataFlow there is the Apply Model step which follows the same logic as before:

  • Select Apply Model
  • Select an ML Model from the list
  • Associate the columns used for the Model training with columns coming from the dataset
  • Select which Columns from the prediction to include in the target dataset
  • Store the resulting dataset

The DataFlow method will apply the predictions only once and store them in a new dataset. Since the resulting dataset will contain both columns from the original dataset as well as the ones from the prediction any combined metric can easily be created as part of the DataFlow or directly in the Project visualization mode. It's worth mentioning that the DataFlow method can't be executed in situations where real-time data is analyzed thus the predictions need to happen at query time. However Incremental DataFlows can be scheduled at every few min intervals, thus near-real-time analytics cases could be addressed.

Final Considerations

Bringing Data Science to people, this is one of Oracle Analytics Cloud's missions: providing automated insights about datasets, training and evaluating Machine Learning Models in an easy and low-code interface, all included within the same tool Business Analysts use in their day to day job. OAC does a great job by offering a rich and powerful set of options via the same GUI Business Users are already experiencing.

Other Data Science Use Cases and Alternatives

We can't, however, expect OAC to cover all the Data Science use cases. OAC is perfect for the initial Data Science enablement, providing tools to get deep insights on existing datasets and create fairly complex Machine Learning models. On the other hand when dealing with massive amounts of data, when complex data transformations need to happen or when parameter fine-tuning for perfect performances is needed, then OAC may not be the right tool.

When running any model training or prediction, OAC is extracting the data, executing the training/scoring python code on the OAC server, and then either displaying or storing the data. OAC is basically moving the data to ML which is inefficient for huge data volumes. In such cases, the approach of moving Machine Learning to Data, by using Oracle Advanced Analytics directly in the Database should be the selected one. Moreover, for more sophisticated use cases, usually in the hands of Data Scientist, Oracle provides Oracle Machine Learning, a Machine Learning SQL notebook based on Apache Zeppelin on top of Oracle Autonomous Data Warehouse.

Worth also mentioning that, for typical code-based Data Scientist type work, Oracle R Enterprise offers the powerful functions of R optimized for the usage in the Oracle database. If you're more a Python guy, Oracle Advanced Analytics contains Oracle Machine Learning for Python (OML4Py) enabling to run an optimized Python version directly in the Oracle Database.

The bonus point of doing Machine Learning in the database is that predictions are usually only a function-call away, thus can be packaged in a view and consumed as standard database objects by Oracle Analytics Cloud users. This allows us to mix and match various Machine Learning creation flows depending on the complexity, amount of data and skillset of the team involved. On a future outlook, Oracle Data Science Cloud is also coming soon, providing collaborative end-to-end ML environment in the cloud.

Conclusion

Oracle Analytics Cloud is the perfect tool to speed up a company's path to Data Science: not only it empowers Business Users with Augmented Analytics, Machine Learning training and prediction flows, it also provides an easy interface to learn tasks like Data Cleaning, Feature Engineering and Model Selection & Evaluation which are proper of Data Scientists. Furthermore, including OAC in an ecosystem with Oracle Advanced Analytics and Oracle Machine Learning, provides a unique, consistent and connected solution capable of handling all Data Science company needs.

In a future blog post series I'll be doing a technical deep dive on how ML works under the hood and how you can extend it with custom Models. Check this space for news!

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Welcome back! In my previous Post, I described how the democratization of Data Science is a hot topic in the analytical industry. We then explored how Oracle Analytics Cloud can act as an enabler for the transformation from Business Analyst to Data Scientist and covered the first steps in a Data Science project: problem definition, data connection & cleaning. In today's post, we'll cover the second part of the path: from the data transformation and enrichment, the analysis, the machine learning model training and evaluation. Let's Start!

Step #3: Transform & Enrich

In the previous post, we understood how to clean data in order to handle wrong values, outliers, perform aggregation, feature scaling and divide our dataset between train and test. Cleaning the data, however, is only the first step in data processing, and should be followed by what in Data Science is called Feature Engineering.

Feature Engineering is a fancy name to call what in ETL terms we always called data transformation, we take a set of columns in input and we apply transformation rules to create new columns. The aim of Feature Engineering is to create good predictors for the following machine learning model. Feature Engineering is a bit of black art and to achieve excellent results requires a deep understanding of the ML Model we intend to use. However, most of the basic transformations are actually driven by domain knowledge: we should create new columns that we think will improve the problem explanation. Let's see some examples:

  • If we're planning to predict the Taxi Fare in New York between any two given points and we have source and destination, a good predictor for the fare probably would be the Euclidean distance between the two.
  • If we have Day/Month/Year on separate columns, we may want to condense the information in a unique column containing the Date
  • In case our dataset contains location names (Cities, Regions, Countries) we may want to geo-tag those properly with ZIP codes or ISO Codes.
  • If we have personal information like Credit Cards details or Person Name, we may want to decide to obfuscate or extract features like the person's sex from the name (on this topic please check the blog post about GDPR and ML from Brendan Tierney).
  • If we have continuous values like the person's age, do we think there is much difference between a 35, 36 or 37 year-old person? If not we should think about binning them in the same category.
  • Most Machine Learning Models can't cope with categorical data, thus we need to transform them to numbers (aka encoding). The standard process, when no ordering exists between the labels, is to create a new column for each value and mark the rows with 1/0 accordingly.

Oracle Analytics Cloud again covers all the above cases with two tools: Euclidean distance, generic data transformation like data condensation and binning are standard steps of the Dataflow component. We only need to set the correct parameters or write simple SQL-like statements. Moreover, for binning, there are options to do it manually as well as automatically providing equal-width and equal-height bins therefore taking out the manual labour and related BIAS.

On the other side the geo-tagging, data obfuscation, automatic feature extraction (like person's sex based on name) is something that with most of the other tools needs to be resolved by hand, with complex SQL statements or dedicated Machine Learning efforts.

OAC again does a great job during the Data Preparation Recommendation step: after defining a data source, OAC will scan column names and values in order to find interesting features and propose some recommendations like geo-tagging, obfuscation, data splitting (e.g. Full Name split into First and Last Name) etc.

The accepted recommendations will be added to a Data Preparation Script that can be automatically applied when updating our dataset.

Step #4: Data Analysis

Data Analysis is declared as Step #4 however since the Data Transformation and Enrichment phase we started a circular flow in order to optimize our predictive model output.

The analysis is a crucial step for any Data Science project; in R or Python one of the first steps is to check dataset head() that will show a first overview of the data like the below

OAC does a similar job with the Metadata Overview where we can see for each column the name, type and sample values as well as the Attribute/Metric definition and associated aggregation than we can then change later on.

Analysing Data is always a complex task and is where the expert eye of a data scientist makes the difference. OAC, however, can help with the excellent Explain feature. As described in the previous post, by right clicking on any column in the dataset and selecting Explain, OAC will start calculating statistics and metrics related to the column and display the findings in graphs that we can incorporate in the Data Visualization project.

Even more, there are additional tabs in the Explain window that provide Key Drivers, Segments and Anomalies.

  • Key Drivers provides the statistically significant drivers for the column we are examining.
  • Segments shows hidden groups in the dataset that can predict outcomes in the column
  • Anomalies does an outlier detection, showing which are corner cases in our dataset

Some Data Science projects could already end here. If the objective was to find insights, anomalies or particular segments in our dataset, Explain already provides that information in a clear and reusable format. We can add the necessary visualization to a Project and create a story with the Narrate option.

If on the other side, our scope is to build a predictive model, then it's time to tackle the next phase: Model Training & Evaluation.

Step #5: Train & Evaluate

Exciting: now it's time to tackle Machine Learning! The first thing to do is to understand what type of problem we are trying to solve. OAC allows us to solve problems in the following categories:

  • Supervised when we have a history of the problem's solution and we want to predict future outcomes, we can then identify two subcategories
    • Regression when we are trying to predict a continuous numerical value
    • Classification when we are trying to assign every sample to a category out of two or more
  • Unsupervised when we don't have a history of the solution, but we ask the ML tool to help us understanding the dataset.
    • Clustering when we try to label our dataset in categories based on similarity.

OAC provides two different ways to apply Machine Learning on a dataset: On the Fly or via DataFlows. The On the Fly method is provided directly in the data visualization: when we create any chart, OAC provides the option to add Clusters, Outliers, Trend and Forecast Lines.

When adding one of the Analytics, we have some control over the behaviour of the predictive model. For the clustering image above we can decide which algorithm to implement (between K-means and Hierarchical Clustering), the number of clusters and the trellis scope in case we visualize multiple scatterplots, one for each value of a dimension.

Applying Machine Learning models on the fly is very useful and could provide some great insights, however, it suffers from a limitation: the columns analysed by the model are only the ones included in the visualization, we have no control over other columns we may want to add to the model to increase predictions accuracy.

If we want to have granular control over columns, algorithm and parameters to use, OAC provides the Train Model step in the DataFlow component.

As described above OAC provides the option to solve Regression problems via Numeric Prediction, apply Binary or Multi-Classifier for Classification, and Clustering. There is also an option to train Custom Models which can be scripted by a Data Scientist, wrapped in XML tags and included in OAC (more about this topic in a later post).

Once we've selected the class of problem we're aiming to solve, OAC lets us select which Model to train between various prebuilt ones. After selecting the model, we need to identify which is the target column (for Supervised ML classes) and fix the parameters. Note the Train Partition Percent providing an automated way to split the dataset in train/test and Categorical/Numerical Column Imputation to handle the missing values. As part of this process, the encoding for categorical data is executed.

... But which Model should we use? What parameters should we pick? One lesson I got from my knowledge of Machine Learning is that there is no golden model and parameters set to solve all problems. Data Scientist will try to use different models, compare them and tune parameters based on experimentation (aka trial and error).

OAC allows us to create an initial Dataflow, select a model, set the parameters then save the Dataflow and model output.  Then restart by opening the Dataflow changing the model or the parameters and storing the artefacts with different names to compare them.

After creating one or more Models, it's time to evaluate them, on OAC we can select a Model and Click on Inspect. In the Overview tab, Inspect shows the model description and properties. Far more interesting is the Quality tab which provides a set of Model scoring metrics based on the test dataset created following the Train Partition Percent parameter. In case of a Numeric Prediction problem, the Quality tab will show for each model quality metrics like the Root Mean Squared Error. OAC will provide similar metrics no matter which ML algorithm you're implementing, making the analysis and comparison easy.

In the case of Classification, the Quality Tab will show the confusion matrix together with some pre-calculated metrics like Precision, Recall etc.

The model selection then becomes an optimization problem for the metric (or set of) we picked during the problem definition (see TEP in the previous post). After trying several models, parameters, features, we'll then choose the model that minimizes the error (or increase the accuracy) of our prediction.

Note: as part of the model training, it's very important to select which columns will be used for the prediction. A blind option is to use all columns but adding irrelevant columns isn't going to provide better results and, for big or wide (huge number of columns) datasets, it becomes computationally very expensive. As written before, the Explain function provides the list of columns that represent statistically significant predictors. The columns listed there should represent the basics of the model training.

Ok, part II done, we saw how to perform Feature Engineering and Model Training and Evaluation, check my next post for the final piece of the Data Science journey: Predictions and final considerations!

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

In the last few weeks, I had the chance to speak both at Analytics and Data Summit held in Oracle HQ in San Francisco and OUG Norway Spring Conference 2019 on a wavy cruise between Oslo and Kiel. The underlying topic of the presentations was just one: demonstrate how Oracle Analytics Cloud can be used to bridge the gap between Business Analysts and Data Science.

Imagine: you only need to connect to the data, and the tool starts giving you suggestions on enrichments and transformations, important drivers, segments and anomalies, and an easy way to create machine learning predictions. Well, that's Oracle Analytics Cloud!

But let's start from the beginning, let's define the problem.

Problem Definition

Let's try to dive into the subject by answering a few questions.

What is Data Science? Why should I be curious about that?

Data science, as per Wikipedia, is a multi-disciplinary field that uses scientific methods, processes, algorithms and systems to extract knowledge and insights from structured and unstructured data.

Why should you be interested? Well, I'm not going to reply to this question directly, I leave it to Gartner to do that. In a recent publication, they claimed that Artificial Intelligence implementation adoption has growth by 270% in the last four years with companies implementing it raising from 10% to 37%. Data Science represents the basics of Artificial Intelligence, the spread of successful AI projects is directly related to the correct application of Data Science principles to raw or curated datasets. Again, as Gartner writes "By 2023, AI and deep-learning techniques will be the most common approaches for new applications of Data Science"

I'm a Business Analyst, I'm an expert in my field, why do I need Data Science?

I see three main drivers to answer this question:

  • Complexity of the data: nowadays we are working on top of huge volumes of highly dimensional datasets. Traditional tools (like Excel) and methods (visual analysis) can't fulfil anymore the analytical needs: data is too complex to be extracted, manipulated and analyzed manually. Thus Data Science techniques and Machine Learning need to be used in the analytical routines to automate parts of the daily workflow of insights generation.
  • Personal Bias: every person has a history that serves as the basis for the daily work. What if your prebuilt knowledge actually poses an obstacle to new insights discovery? With Data Science you can discover what is statistically significant in the data versus what you think it's significant.
  • Future Work Opportunities: I leave this driver to an article of insidebigdata.com. The Data Scientist skillset is rare and there is a boom in demand. What are you waiting for?

I'm a Business Analyst, can't I just rebrand myself as Data Scientist?

There is no single definition of a Business Analyst and Data Scientist, it hugely depends both on prior knowledge and study as well as the context where the person is working on. However, there have been multiple attempts of defining Data Scientist skills over the years, with the below Venn diagram being quite famous, coming from drewconway.com.

The diagram is already showing some common points and differences between the general idea of Business Analyst and the Data Scientist:

  • Substantive Expertise: this skillset is a must-have in both roles. Even if Data Science standard techniques can be applied in every context, domain knowledge is required to gain better insights into the data.
  • Math & Statistic Knowledge: this is the first difference, the math used in Data Science is probably more advanced than the one used in the day to day job as Business Analysts. As per datascience.com's suggestion, #1 step to go from a Data Analyst to a Data Scientist is to refresh statistics.
  • Hacking Skills: this is where the biggest difference is, Data Scientist mostly uses scripting languages like R and Python which are not traditionally included in the skillset of a Business Analyst who instead uses visual tools.

Summarizing, there is quite some difference between the skillset traditionally associated with a Business Analyst and the Data Scientist one. A simple "rebranding" is not enough.

How can I empower my Business Analysts with Data Science Skills?

This is what it's called Data Science Democratization: using tools and techniques to lower the barrier to Data Science!

One of the drivers of Data Science Democratization is Augmented Analytics! Gartner defines Augmented Analytics as the Future of Data Analytics, but what is it?

Augmented Analytics

It's the concept of using Machine Learning to automate some steps of data preparation, insights discovery and sharing. Those steps are included in the Analytical platform already used by Business Analysts.

Let's see an example: the typical task of a Business Analyst is to find the significant drivers of Sales. With the usual flow based on BA's prior knowledge: the experience was driving the analysis and thus insight generation. However, how we discussed before, this brings a personal BIAS that could stop BA from understanding new and hidden patterns. What if the analytical platform itself suggests, based on the dataset, what are the statistically significant drivers? Those drivers can then be verified by the Business Analyst and maybe generate new insights.

Low-code Machine Learning

The second trend in Data Science Democratization is empowering Business Users to create low-code Machine Learning models, in the same GUI-driven tool they use for their day-to-day job.

Let's see another example: if a Business Analyst wants to classify a client based on the fact that he'll accept an offer. With no prior knowledge, he starts from a 50/50 position (50% chance he will say yes or no). The traditional approach was either personal-experience based (with the related Bias problem) or the involvement of a Data Scientist. What if, instead, with a series of simple, visual and repeatable steps he could create an ML model correctly predicting 70% of the customers accepting the offer? The model will not be perfect but still giving the option to focus the calls to the clients more likely to accept the offer, generating potentially a 20% increase in sales.

How Oracle Analytics Cloud Bridges the Gap to Data Science?

Oracle Analytics Cloud offers a series of features enabling Data Science, I've been blogging about them in 2018 when they were first published.  Let's see now how a Data Science process could be executed with OAC.

Step #0: Problem Definition

The basic step in Data Science (not in the picture above) actually doesn't require any tool: it's the Problem Definition. There is no general purpose Data Science! We need to define precisely what problem we're going to solve. A good approach is to define using TEP: Task, Experience and Performance.

  • Task represents the problem we're trying to solve: Classify Spam/Not-Spam, predict the temperature in the room
  • Experience represents the historical background that we'll use: Corpus of emails already classified as Spam/Not-Spam, history of hourly room temperatures
  • Performance is the metric we are using to understand how good/bad our prediction is. There is a multitude of metrics we can use, depending on the type of problem we are trying to solve. The metric(s) selection will guide the decisions in our Data Science process.
Step #1: Data Connection

This is the first step in the tool itself, and, I believe, very familiar for any Business Analyst working with Oracle Analytics Cloud. OAC provides users with the ability to connect to:

  • Curated Subject Areas coming from a prebuilt repository in the hands of the IT department where data is sourced, modelled and exposed following precise business logic and KPI definitions. The importance of a central curated data repository is detailed here
  • Raw Data Sources: do you have data in Excel, MongoDB, Kafka, Dropbox? No problem, OAC allows you to connect to it and start analysing within few clicks! All you need is specifying the connection URL, credentials and parameters.
  • Both: do you want to mix curated KPIs with data coming from a variety of datasets? OAC covers this case by allowing you to mash them up!
Step #2: Clean

Data cleaning, I guess many Business Analysts are already familiar with this. What are, on the other hand, the typical cleaning steps a data scientist performs?

The usual steps are the following

  • Handle Missing Values: What does a missing value mean? How should I treat it?  Understanding and handling missing values are fundamental since it could enhance our comprehension of the dataset. Should I remove samples with missing values? Should I substitute them with a default value? The response depends on the value meaning.
  • Correcting Wrong Values: Free Text, easy to implement, a nightmare to fix! In the example above the same content, MARK is written in two different ways (capital and lower case A). For a human eye, the two values are the same, but for a computer, they'll belong to two different classes. If we want to optimise the output of our model, we should optimise the inputs by correcting the wrong values.
  • Remove Irrelevant Observation: If we are trying to do predictions on the city of Milan in Italy, are the observations of Rome relevant? Or the ones from Milan in Michigan, USA? If not we should remove them because the following Machine Learning model a) could learn behaviours which are not relevant  b) the related process will run faster since it will need to handle less data
  • Labelling Columns: especially when connecting to raw datasources we can't expect to have proper naming of the columns. E.g. Col1 is containing ClientName. The Machine Learning Tools will probably don't care about column names, but you do in order to explain their behaviour.
  • Handling Outliers: outliers represent corner cases of our dataset. Is the CIO salary relevant in our dataset if we are trying to predict a person salary based on the years of experience in the company? Well, probably not since the CIO salary will be far away from any other salary. The images below show the same predictive model: trying to fit all salaries including the CIO one (10 Years Experience) in A, and excluding the CIO salary in B. We can clearly see that excluding the corner case in our dataset improves the fitting of our predicting model.
  • Feature Scaling: Some of the predictive models in data science try to optimise the Euclidean distance (the distance between the predicted points and the actual points). If we have features on different scales (e.g. the number of bedrooms in a house and house price) those models will try to optimise much better the feature having a larger scale since the Euclidean distance gain will be much higher than when optimising the feature having a smaller scale.
  • Aggregations: most of the times the dataset we'll face with will have a different granularity to the one we intend to work with, just imagine working with a website clickstream trying to predict customer buying. Probably we'll want to aggregate the clickstream at session or user level and count the number of clicks, the time within the session or between sessions.
  • Train/Test Split: it's very important to test our prediction against unseen data in order to avoid overfitting. The dataset division in train/test can be done randomly or based on specific columns (Ids, timestamps) depending on the problem we are facing.

How do we perform the above steps with OAC? In the OAC bundle, there is a component called Dataflow (described here), providing ETL capabilities in the same visual platform where we visualize the data.

DataFlows provides a variety of steps including:

  • CASE WHEN...THEN... statements useful to handle missing values
  • FILTERS to remove irrelevant observations and outliers and split train and test datasets.
  • COLUMN MANAGEMENT to rename, aggregate, combine, write simple transformations and aggregations useful in features scaling and error corrections.

An important topic during the data cleaning is the Outlier Identification for which we have several options with OAC. The basic one is by plotting the data and visually selecting the outliers.

OAC also offers the option to automatically label points a Outliers/Not Outliers.

The third option OAC offers is via the Explain option: by selecting any column in the dataset and clicking on Explain OAC, it provides information about the column itself which can be useful for spotting outliers. Like in the example below we can clearly see NULL values (on the right side of the chart) and odd values in the range 0-25 where most of the values are in the range 80-100.

So far we understood how OAC helps out in the first three steps in our data science path: objective definition, data connection and cleaning. Stay tuned for the next blog posts covering feature engineering, model creation & evaluation and prediction!

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Rittman Mead have today launched it's new Oracle Analytics Cloud (OAC) Bootcamp. Run on OAC, the course lasts four days and covers everything you need to know in order to manage your Cloud BI platform and assumes no prior knowledge up-front.

As the course is modular, you are able to choose which days you'd like to attend. Day 1 covers an OAC overview, provisioning, systems management, integration and security. Day 2 covers RPD Modelling and Data Modeller. Day 3 is devoted to creating reports, dashboards, alerts and navigation. Day 4 covers content creation using Oracle Data Visualization.

Book here: https://www.rittmanmead.com/training-schedule/

Got a team to train? You can also have our OAC Bootcamp delivered on-site at your location. For more information and prices contact training@rittmanmead.com

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

In my first two blog posts of the Spark Streaming and Kafka series - Part 1 - Creating a New Kafka Connector and Part 2 - Configuring a Kafka Connector - I showed how to create a new custom Kafka Connector and how to set it up on a Kafka server. Now it is time to deliver on the promise to analyse Kafka data with Spark Streaming.

When working with Apache Spark, you can choose between one of these programming languages: Scala, Java or Python. (There is also support for Spark querying in R.) Python is admittedly the most popular, largely thanks to Python being the most popular (and easiest to learn) programming language from the selection above. Python's PySpark library is catching up with the Spark features available in Scala, but the fact that Python relies on dynamic typing, poses challenges with Spark integration and in my opinion makes Spark a less natural fit with Python than with Scala.

Spark and Scala - the Basics

Spark was developed in Scala and its look and feel resembles its mother language quite closely. In fact, before diving into Spark Streaming, I am tempted to illustrate that for you with a small example (that also nicely recaptures the basics of Spark usage):

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object SparkTellDifference extends App {

    // set up Spark Context
    val sparkSession = SparkSession.builder.appName("Simple Application").config("spark.master", "local[*]").getOrCreate()
    val sparkContext = sparkSession.sparkContext
    sparkContext.setLogLevel("ERROR")

    // step 0: establish source data sets
    val stringsToAnalyse: List[String] = List("Can you tell the difference between Scala & Spark?", "You will have to look really closely!")
    val stringsToAnalyseRdd: RDD[String] = sparkContext.parallelize(stringsToAnalyse)

    // step 1: split sentences into words
    val wordsList: List[String]   = stringsToAnalyse    flatMap (_.split(" "))
    val wordsListRdd: RDD[String] = stringsToAnalyseRdd flatMap (_.split(" "))

    // step 2: convert words to lists of chars, create (key,value) pairs.
    val lettersList: List[(Char,Int)]   = wordsList    flatMap (_.toList) map ((_,1))
    val lettersListRdd: RDD[(Char,Int)] = wordsListRdd flatMap (_.toList) map ((_,1))

    // step 3: count letters
    val lettersCount: List[(Char, Int)] = lettersList groupBy(_._1) mapValues(_.size) toList
    val lettersCountRdd: RDD[(Char, Int)] = lettersListRdd reduceByKey(_ + _)

    // step 4: get Top 5 letters in our sentences.
    val lettersCountTop5: List[(Char, Int)] = lettersCount sortBy(- _._2) take(5)
    val lettersCountTop5FromRdd: List[(Char, Int)] = lettersCountRdd sortBy(_._2, ascending = false) take(5) toList

    // the results
    println(s"Top 5 letters by Scala native: ${lettersCountTop5}")
    println(s"Top 5 letters by Spark: ${lettersCountTop5FromRdd}")

    // we are done
    sparkSession.stop()
}

The code starts by setting up a Spark Session and Context. Please note that Spark is being used in local mode - I do not have Spark nodes installed in my working environment. With Spark Context set up, step 0 is to establish data sources. Note that the Spark RDD is based on the Scala native List[String] value, which we parallelize. Once parallelized, it becomes a Spark native.

Step 1 splits sentences into words - much like we have seen in the typical Spark word count examples. Step 2 splits those word strings into Char lists - instead of words, let us count letters and see which letters are used the most in the given sentences. Note that Steps 1 and 2 look exactly the same whilst the first one is Scala native whereas the second works with a Spark RDD value. Step 2 ends with us creating the familiar (key,value) pairs that are typically used in Spark RDDs.

Step 3 shows a difference between the two - Spark's reduceByKey has no native Scala analogue, but we can replicate its behaviour with the groupBy and mapValues functions.

In step 4 we sort the data sets descending and take top 5 results. Note minor differences in the sortBy functions.

As you can see, Spark looks very Scala-like and you may have to look closely and check data types to determine if you are dealing with Scala native or remote Spark data types.

The Spark values follow the typical cycle of applying several transformations that transform one RDD into another RDD and in the end the take(5) action is applied, which pulls the results  from the Spark RDD into a local, native Scala value.

Introducing Spark Streaming

A good guide on Spark Streaming can be found here.

A quick overview of Spark Streaming with Kafka can be found here, though it alone will unlikely be sufficient to understand the Spark Streaming context - you will need to read the Spark Streaming guide as well.

Working with Spark streams is mostly similar to working with regular RDDs. Just like the RDDs, on which you apply transformations to get other immutable RDDs and then apply actions to get the data locally, Spark Streams work similarly. In fact, the transformation part looks exactly the same - you apply a transformation on a Discretized Stream (DStream) to get another DStream. For example, you can have a val words: DStream[String] that represents a stream of words. You can define another DStream with those same words in upper case as

val wordsUpper: DStream[String] = words map (_.toUpperCase)

Note that both these values represent streams - data sources where new data production might be ongoing. So if you have an incoming stream of words, you can define another data stream of the same words but in upper case. That includes the words not yet produced into the stream.

Source: https://spark.apache.org/docs/latest/streaming-programming-guide.html

(If the values words were an RDD, the wordsUpper calculation would look almost the same: val wordsUpper: RDD[String] = words map (_.toUpperCase).) However, DStreams and RDDs differ when it comes to getting the data locally - for RDDs you call actions, for DStreams it is a bit more complicated. But... let us start from the beginning.

Setting up Spark Streaming

Much like a Spark Session and Context, Spark Streaming needs to be initialised.

We start by defining Spark Config - much like for SparkSession in the simple Spark example, we specify the application name and define the nodes we are going to use - in our case - local nodes on my developer workstation. (The asterisk means that Spark can utilise all my CPU threads.)

val sparkConfig = 
  new SparkConf().setMaster("local[*]").setAppName("SparkKafkaStreamTest")

The next step is creating a Spark StreamingContext. We pass in the config defined above but also specify the Spark Streaming batch interval - 1 minute. This is the same as the production interval by our Connector set up in Kafka. But we could also define a 5 minute batch interval and get 5 records in every batch.

val sparkStreamingContext = new StreamingContext(sparkConfig, Minutes(1))

Before we proceed, we would like to disable the annoying INFO messages that Spark likes to flood us with. Spark log level is set in Spark Context but we do not have SparkContext defined, do we? We only have StreamingContext. Well, actually, upon the creation of a StreamingContext, SparkContext is created as well. And we can access it via the StreamingContext value:

sparkStreamingContext.sparkContext.setLogLevel("ERROR")

That is the Spark Streaming Context dealt with.

Setting up Access to Kafka

Setting up access to Kafka is equally straightforward. We start by configuring Kafka consumer:

val kafkaConfig = Map[String, Object](
    "client.dns.lookup" -> "resolve_canonical_bootstrap_servers_only",
    "bootstrap.servers" -> "192.168.1.13:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "kafkaSparkTestGroup",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
)

The parameters given here in a Scala Map are Kafka Consumer configuration parameters as described in Kafka documentation. 192.168.1.13 is the IP of my Kafka Ubuntu VM.

Although I am referring to my Kafka server by IP address, I had to add an entry to the hosts file with my Kafka server name for my connection to work:

192.168.1.13 kafka-box

The client.dns.lookup value did not have an impact on that.

The next step is specifying an array of Kafka topics - in our case that is only one topic - 'JanisTest':

val kafkaTopics = Array("JanisTest")
Getting First Data From Kafka

We are ready to initialise our Kafka stream in Spark. We pass our StreamingContext value, topics list and Kafka Config value to the createDirectStream function. We also specify our LocationStrategy value - as described here. Consumer Strategies are described here.

val kafkaRawStream: InputDStream[ConsumerRecord[String, String]] =
        KafkaUtils.createDirectStream[String, String](
            sparkStreamingContext,
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](kafkaTopics, kafkaConfig)
        )

What gets returned is a Spark Stream coming from Kafka. Please note that it returns Kafka Consumer record (key,value) pairs. The value part contains our weather data in JSON format. Before we proceed with any sort of data analysis, let us parse the JSON in a similar manner we did JSON parsing in the Part 1 of this blog post. I will not cover it here but I have created a Gist that you can have a look at.

The weatherParser function converts the JSON to a WeatherSchema case class instance - the returned value is of type DStream[WeatherSchema], where DStream is the Spark Streaming container:

val weatherStream: DStream[WeatherSchema] = 
   kafkaRawStream map (streamRawRecord => weatherParser(streamRawRecord.value))

Now our data is available for nice and easy analysis.

Let us start with the simplest - check the number of records in our stream:

val recordsCount: DStream[Long] = weatherStream.count()

The above statement deserves special attention. If you have worked with Spark RDDs, you will remember that the RDD count() function returns a Long value instead of an RDD, i.e. it is an action, not a transformation. As you can see above, count() on a DStream returns another DStream, instead of a native Scala long value. It makes sense because a stream is an on-going data producer. What the DStream count() gives us is not the total number of records ever produced by the stream - it is the number of records in the current 1 minute batch. Normally it should be 1 but it can also be empty. Should you take my word for it? Better check it yourself! But how? Certainly not by just printing the recordsCount value - all you will get is a reference to the Spark stream and not the stream content.

Displaying Stream Data

Displaying stream data looks rather odd. To display the recordsCount content, you need the following lines of code:

recordsCount.print()

...

sparkStreamingContext.start() // start the computation
sparkStreamingContext.awaitTermination() // await termination

The DStream value itself has a method print(), which is different from the Scala's print() or println() functions. However, for it to actually start printing stream content, you need to start() stream content computation, which will start ongoing stream processing until terminated. The awaitTermination() function waits for the process to be terminated - typically with a Ctrl+C. There are other methods of termination as well, not covered here. So, what you will get is recordsCount stream content printed every batch interval (1 minute in our example) until the program execution is terminated.

The output will look something like this, with a new record appearing every minute:

-------------------------------------------
Time: 1552067040000 ms
-------------------------------------------
1

-------------------------------------------
Time: 1552067100000 ms
-------------------------------------------
0

-------------------------------------------
Time: 1552067160000 ms
-------------------------------------------
1

Notice the '...' between the recordsCount.print() and the stream start(). You can have DStream transformations following the recordsCount.print() statement and other DStream print() calls before the stream is started. Then, instead of just the count, you will get other values printed for each 1 minute batch.

You can do more than just print the DStream content on the console, but we will come to that a bit later.

Analysing Stream Data

Above we have covered all the basics -  we have initialised Spark Context and Kafka access, we have retrieved stream data and know how how to set up ongoing print of the results for our Stream batches. Before we proceed with our exploration, let us define a goal for our data analysis.

We are receiving a real-time stream of weather data. What we could analyse is the temperature change dynamics within the past 60 minutes. Note that we are receiving a new batch every minute so every minute our 60 minute window will move one step forward.

Source: https://spark.apache.org/docs/latest/streaming-programming-guide.html

What we have got is our weatherStream DStream value. First let us define a Stream window of 60 minutes (check Spark documentation for explanation on how Stream Windows work.)

val weatherStream1Hour: DStream[WeatherSchema] = weatherStream.window(Minutes(60))

The WeatherSchema case class contains many values. But all we need for our simple analysis is really just the timestamp and temperature. Let us extract just the data we need and put it in a traditional RDD (key,value) pair. And we print the result to verify it.

val weatherTemps1Hour: DStream[(String, Double)] = 
   weatherStream1Hour map (weatherRecord => 
     (weatherRecord.dateTime, weatherRecord.mainTemperature) 
   )

weatherTemps1Hour.print()

Please note that the above code should come before the sparkStreamingContext.start()call.

The result we are getting looks something like this:

-------------------------------------------
Time: 1552068480000 ms
-------------------------------------------
(08/03/2019 16:57:27,8.42)
(08/03/2019 16:57:27,8.42)
(08/03/2019 17:06:02,8.38)
(08/03/2019 17:06:02,8.38)
(08/03/2019 17:06:02,8.38)
(08/03/2019 17:06:02,8.38)
(08/03/2019 17:06:02,8.38)
(08/03/2019 17:06:02,8.38)
(08/03/2019 17:06:02,8.38)
(08/03/2019 17:06:02,8.38)
...

Notice the ellipse at the end. Not all records get displayed if there are more than 10.

Of course, we will get a new result printed every minute. However, the latest results will be at the bottom, which means they will be hidden if there are more than 10 of them. Also note that the weather data we are getting is actually not refreshed every minute but more like every 10 minutes. Our 1 minute batch frequency does not represent the actual frequency of weather data updates. But let us deal with those problems one at a time.

For me, vanity always comes first. Let me convert the (key,value) pair output to a nice looking narrative.

val weatherTemps1HourNarrative = weatherTemps1Hour map { 
  case(dateTime, temperature) => 
    s"Weather temperature at ${dateTime} was ${temperature}" 
}

weatherTemps1HourNarrative.print()

The result:

-------------------------------------------
Time: 1552068480000 ms
-------------------------------------------
Weather temperature at 08/03/2019 16:57:27 was 8.42
Weather temperature at 08/03/2019 16:57:27 was 8.42
Weather temperature at 08/03/2019 17:06:02 was 8.38
Weather temperature at 08/03/2019 17:06:02 was 8.38
Weather temperature at 08/03/2019 17:06:02 was 8.38
Weather temperature at 08/03/2019 17:06:02 was 8.38
Weather temperature at 08/03/2019 17:06:02 was 8.38
Weather temperature at 08/03/2019 17:06:02 was 8.38
Weather temperature at 08/03/2019 17:06:02 was 8.38
Weather temperature at 08/03/2019 17:06:02 was 8.38
...

We are still limited to the max 10 records the DStream print() function gives us. Also, unless we are debugging, we are almost certainly going to go further than just printing the records on console. For that we use the DStream foreachRDD function, which works similar to the map function, but does not return any data. Instead, whatever we do with the Stream data - print it to console, save it into a CSV file or database - that needs to take place within the foreachRDD function.

The foreachRDD Function

The foreachRDD function accepts a function as a parameter, which receives as its input a current RDD value representing the current content of the DStream and deals with that content in the function's body.

Ok, at long last we are getting some results back from our Spark stream that we can use, that we can analyse, that we know how to deal with! Let us get carried away!

weatherTemps1Hour foreachRDD { currentRdd =>
  println(s"RDD content:\n\t${currentRdd.collect().map{case(dateTime,temperature) => s"Weather temperature at ${dateTime} was ${temperature}"}.mkString("\n\t")}")

  val tempRdd: RDD[Double] = currentRdd.map(_._2)

  val minTemp = if(tempRdd.isEmpty()) None else Some(tempRdd.min())
  val maxTemp = if(tempRdd.isEmpty()) None else Some(tempRdd.max())

  println(s"Min temperature: ${if(minTemp.isDefined) minTemp.get.toString else "not defined"}")

  println(s"Max temperature: ${if(maxTemp.isDefined) maxTemp.get.toString else "not defined"}")

  println(s"Temperature difference: ${if(minTemp.isDefined && maxTemp.isDefined) (maxTemp.get - minTemp.get).toString}\n")
}

Here we are formatting the output and getting min and max temperatures within the 60 minute window as well as their difference.  Let us look at the result:

RDD content:
	Weather temperature at 08/03/2019 16:57:27 was 8.42
	Weather temperature at 08/03/2019 16:57:27 was 8.42
	Weather temperature at 08/03/2019 17:06:02 was 8.38
	Weather temperature at 08/03/2019 17:06:02 was 8.38
	Weather temperature at 08/03/2019 17:06:02 was 8.38
	Weather temperature at 08/03/2019 17:06:02 was 8.38
	Weather temperature at 08/03/2019 17:06:02 was 8.38
	Weather temperature at 08/03/2019 17:06:02 was 8.38
	Weather temperature at 08/03/2019 17:06:02 was 8.38
	Weather temperature at 08/03/2019 17:06:02 was 8.38
	Weather temperature at 08/03/2019 17:06:02 was 8.38
	Weather temperature at 08/03/2019 17:06:02 was 8.38
	Weather temperature at 08/03/2019 17:06:02 was 8.38
	Weather temperature at 08/03/2019 17:17:32 was 8.28
	Weather temperature at 08/03/2019 17:17:32 was 8.28
	Weather temperature at 08/03/2019 17:17:32 was 8.28
	Weather temperature at 08/03/2019 17:17:32 was 8.28
	Weather temperature at 08/03/2019 17:17:32 was 8.28
	Weather temperature at 08/03/2019 17:17:32 was 8.28
	Weather temperature at 08/03/2019 17:17:32 was 8.28
	Weather temperature at 08/03/2019 17:17:32 was 8.28
	Weather temperature at 08/03/2019 17:17:32 was 8.28
	Weather temperature at 08/03/2019 17:17:32 was 8.28
	Weather temperature at 08/03/2019 17:30:52 was 8.28
	Weather temperature at 08/03/2019 17:30:52 was 8.28
	Weather temperature at 08/03/2019 17:30:52 was 8.28
	Weather temperature at 08/03/2019 17:30:52 was 8.28
	Weather temperature at 08/03/2019 17:30:52 was 8.28
	Weather temperature at 08/03/2019 17:30:52 was 8.28
	Weather temperature at 08/03/2019 17:30:52 was 8.28
	Weather temperature at 08/03/2019 17:30:52 was 8.28
	Weather temperature at 08/03/2019 17:30:52 was 8.28
	Weather temperature at 08/03/2019 17:30:52 was 8.28
	Weather temperature at 08/03/2019 17:30:52 was 8.28
	Weather temperature at 08/03/2019 17:37:51 was 8.19
	Weather temperature at 08/03/2019 17:37:51 was 8.19
	Weather temperature at 08/03/2019 17:37:51 was 8.19
	Weather temperature at 08/03/2019 17:37:51 was 8.19
	Weather temperature at 08/03/2019 17:37:51 was 8.19
	Weather temperature at 08/03/2019 17:37:51 was 8.19
	Weather temperature at 08/03/2019 17:37:51 was 8.19
	Weather temperature at 08/03/2019 17:37:51 was 8.19
	Weather temperature at 08/03/2019 17:37:51 was 8.19
	Weather temperature at 08/03/2019 17:37:51 was 8.19
	Weather temperature at 08/03/2019 17:37:51 was 8.19
	Weather temperature at 08/03/2019 17:51:10 was 8.1
	Weather temperature at 08/03/2019 17:51:10 was 8.1
	Weather temperature at 08/03/2019 17:51:10 was 8.1
	Weather temperature at 08/03/2019 17:51:10 was 8.1
	Weather temperature at 08/03/2019 17:51:10 was 8.1
	Weather temperature at 08/03/2019 17:51:10 was 8.1
	Weather temperature at 08/03/2019 17:51:10 was 8.1
	Weather temperature at 08/03/2019 17:51:10 was 8.1
	Weather temperature at 08/03/2019 17:51:10 was 8.1
	Weather temperature at 08/03/2019 17:51:10 was 8.1
	Weather temperature at 08/03/2019 18:01:10 was 7.99
	Weather temperature at 08/03/2019 18:01:10 was 7.99
	Weather temperature at 08/03/2019 18:01:10 was 7.99
Min temperature: 7.99
Max temperature: 8.42
Temperature difference: 0.4299999999999997

(Now there is no Time: 1552068480000 ms signature in our results printout because we are no longer using the DStream print() function).

However, I would like my analysis to be more detailed. It is time to involve Spark DataFrames.

Kafka Stream Data Analysis with Spark DataFrames

Just like in the previous statement, I need to extract Stream data with the currentRDD function. In fact, all the code that follows will be within the currentRDD function block:

weatherStream1Hour foreachRDD { currentRdd => {
... // the following code comes here
}

First, let us create a DataFrame from an RDD. Spark RDDs and DataFrames are two quite different representations of distributed data. And yet - look how simply the conversion works:

val spark =
   SparkSession.builder.config(currentRdd.sparkContext.getConf).getOrCreate()
import spark.implicits._

val simpleDF: DataFrame = currentRdd.toDF()
simpleDF.createOrReplaceTempView("simpleDF")

This trick works because our weatherStream1Hour DStream and consequently the currentRdd value that represents the Stream content, are based on the WeatherSchema case class. (data types - weatherStream1Hour: DStream[WeatherSchema] and currentRdd: RDD[WeatherSchema].) Therefore the currentRdd.toDF() implicit conversion works - Spark understands Scala case classes.

Once we have the DataFrame created, we can create a Temp view so we can query this DF with Spark SQL - that is what the createOrReplaceTempView function is for.

Let us start with the simplest queries - query the count(*) and the full content of the DataFrame:

val countDF: DataFrame = spark.sql("select count(*) as total from simpleDF")
countDF.show()

val selectAllDF = spark.sql("select * from simpleDF")
selectAllDF.show()

The result:

DataFrame's show() function by default only shows 20 rows, but we can always adjust that to show more or less. However, as we had established earlier in our analysis, the weather data actually does not get updated every minute - we are..

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

The Analytics and Data Summit 2019 (formerly known as BIWA) is happening next week in Oracle HQ in Redwood Shores. I'm excited to participate since it's one of the best conferences for the Oracle Analytics crowd where you can get three days full of content from experts as well as hints on products future developments directly from the related Product Managers!

I'll be representing Rittman Mead in two sessions:

This two-hour workshop will cover all the details of OAC: Product Overview, Instance Creation and Management, Moving from on-prem OBIEE to OAC, Data Preparation and Data Visualization, Advanced Analytics.  With interactive labs where participants can experience Data Visualization and Data Flows.

Become a Data Scientist with OAC! This session will explain how Oracle Analytics Cloud acts as an  enabler for the transformation from a Data Analyst to a Data Scientist. Connection to the Data, Cleaning, Transformation, and Analysis will be the intermediate steps before training of several machine learning models which then will be evaluated and used to predict outcomes on unseen data. With a demo showing all the steps in a real example based on a wine dataset!

There is a full list of all sessions here. You can follow the conference on twitter with the hashtag #AnDSummit2019, and I'll be tweeting about it too as @ftisiot.

The presentations that I'm delivering will be available to download on speakerdeck.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

In my previous blog post, I covered the development of a custom Kafka Source Connector, written in Scala. This blog post is about deploying that Connector. We are getting closer to analysing the stream data in Spark - as promised in the title.

If you are installing a Connector from Confluent Hub, instead of installing a custom one, please follow this guide instead.

Setting up Kafka Server

This blog post does not cover setting up a Kafka instance. However, let me give a quick overview of the environment I am using:

  • OS: Ubuntu server 18.10.
  • Kafka version - at the time of writing, the latest version is 2.1.0. So that is the one I am using. Please note that the Kafka Connector API interface is slightly different for different Kafka versions. (In fact, early in my Connector development I saw Kafka throwing strange missing function errors. It turned out I was using a slightly older API for Kafka 2.0.0 instead of the one for 2.1.0. - the SourceConnector class's function `taskConfigs` had been renamed in the latest version.)
  • Java: OpenJDK 11.0.1. It seems the OpenJDK is preferred for Kafka over the Oracle Java JDK.

I got the Kafka install file from:
http://www-eu.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz

Setting up Kafka Topic

Confluent have an excellent user guide on how to set up Connectors. There is also a Quick Start guide in the Apache Kafka website, though it is much less detailed than the guide from Confluent.

The first step for me, once my Kafka instance is up and running, is to create a new topic for my Weather records. Let me call it 'JanisTest'. From Kafka root folder I execute:

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic JanisTest
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
JanisTest
__consumer_offsets
$

After creating it, I run the --list command to see that it is indeed created and available.

Install Connector

Copy jars from your Scala development environment to your Kafka instance. I chose to have a folder of jars, not one uber-jar with all library dependencies packaged in. But either approach should work fine.

$ cd ~/kafka
$ mkdir connectors
$ cd connectors
$ cp /VMSharedFolder/connectorJars/* .

Do not add the above folder to CLASSPATH! Instead, add it to the
connect-*.properties config files. In the later versions of Kafka, the server itself manages paths to plugins.

$ cd ~/kafka/config
$ vi connect-standalone.properties

We will run our connector in standalone mode, therefore the important file for us to edit is 'connect-standalone.properties'. But we can also edit
the 'connect-distributed.properties' config file if we ever decide to run Kafka on multiple nodes.

Add the following line to the connect-standalone.properties and connect-distributed.properties configuration files:

plugin.path=/home/kafka/kafka/connectors

If you need to add several connector folders, the plugin.path value is comma-separated.

Configure Connector

Similar to the connect-standalone.properties file, our newly installed Source Connector also requires a configuration file. Let us create one:

$ cd ~/kafka/config
$ vi connect-http-source.properties

We specify the following configuration attributes in the config file - these attributes were defined in the `HttpSourceConnectorConstants` object we discussed in my previous blog post.

name=http-source

http.url=http://api.openweathermap.org/data/2.5/weather
http.api.key=<YOUR API KEY>
http.api.params=units=metric&q=London,uk

service.name=CurrentWeatherData
topic=JanisTest

poll.interval.ms=60000
tasks.max=1
connector.class=com.rittmanmead.kafka.connector.http.HttpSourceConnector
  • name - name of the Connector.
  • http.url - the base URL for the HTTP API.
  • http.api.key - replace <YOUR API KEY> with the key you got when registering with OpenWeatherMap.
  • http.api.params - any additional parameters you would like to specify for your API request - as described in the API documentation. Follow the format name=value. In case you need to specify several attributes, separate them with a &.
  • service.name - you can name your service.
  • topic - give the topic name that was created previously, as described in this guide.
  • poll.interval.ms - the frequency at which the API will be queried. 60000ms = 1 minute. Because the weather does not change that quickly, this polling interval is frequent enough for us. The free API tier limits us to no more than 1 query per second.
  • tasks.max - the maximum number of concurrent tasks allowed. In our case it is 1.
  • connector.class - the full class path of the SourceConnector class we have implemented, which is described in my previous blog post.
Run Connector

Because we are only running the Connector in standalone mode, there is not much config to be done. We are ready to run it. To start our Source Connector, we run the connect-standalone.sh command:

$ cd ~/kafka
$ ./bin/connect-standalone.sh ./config/connect-standalone.properties ./config/connect-http-source.properties

The first argument passed to the connect-standalone.sh script is the worker properties configuration - only one configuration. It is followed by one or many (on our case - one) connector configuration.

Take a deep breath and hit Enter.

A long log output will follow. If you get any errors, the command will return to shell prompt. Some possible errors are missing classes - if that is the case, please check that you have successfully deployed all the jars and they are registered in the connect-standalone.properties file. Also, if any of the required configuration parameters are missing in the connect-http-source.properties file, you will get an error message.

If successful, the log listing will end with something like this:

[2019-02-19 14:14:55,294] INFO Kafka version : 2.1.0 (org.apache.kafka.common.utils.AppInfoParser:109)
[2019-02-19 14:14:55,297] INFO Kafka commitId : 809be928f1ae004e (org.apache.kafka.common.utils.AppInfoParser:110)
[2019-02-19 14:14:55,334] INFO Created connector http-source (org.apache.kafka.connect.cli.ConnectStandalone:104)
[2019-02-19 14:14:55,406] INFO Setting up an HTTP service for http://api.openweathermap.org/data/2.5/weather... (com.rittmanmead.kafka.connector.http.HttpSourceTask:40)
[2019-02-19 14:14:55,466] INFO Starting to fetch from http://api.openweathermap.org/data/2.5/weather each 60000ms... (com.rittmanmead.kafka.connector.http.HttpSourceTask:47)
[2019-02-19 14:14:55,469] INFO WorkerSourceTask{id=http-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:199)
[2019-02-19 14:14:56,560] INFO Http return code: 200 (com.rittmanmead.kafka.connector.http.HttpSourceTask:31)
[2019-02-19 14:14:56,569] INFO Weather Schema parser: JSON text to be parsed: {"coord":{"lon":-0.13,"lat":51.51},"weather":[{"id":802,"main":"Clouds","description":"scattered clouds","icon":"03d"}],"base":"stations","main":{"temp":10.09,"pressure":1019,"humidity":61,"temp_min":9,"temp_max":11},"visibility":10000,"wind":{"speed":4.6,"deg":250},"clouds":{"all":44},"dt":1550582400,"sys":{"type":1,"id":1414,"message":0.0038,"country":"GB","sunrise":1550559964,"sunset":1550597002},"id":2643743,"name":"London","cod":200} (com.rittmanmead.kafka.connector.http.HttpSourceTask:273)
[2019-02-19 14:14:57,481] INFO JSON parsed class content: WeatherSchema(Coord(-0.13,51.51),List(WeatherAtom(802.0,Clouds,scattered clouds,03d)),stations,Main(10.09,1019.0,61.0,9.0,11.0),10000.0,Wind(4.6,250.0),Clouds(44.0),1.5505824E9,Sys(1.0,1414.0,0.0038,GB,1.550559964E9,1.550597002E9),2643743.0,London,200.0) (com.rittmanmead.kafka.connector.http.HttpSourceTask:283)
[2019-02-19 14:14:57,499] INFO Got 1 results for CurrentWeatherData (com.rittmanmead.kafka.connector.http.HttpSourceTask:75)

Some of the above messages were generated by our Connector directly, like the last line above - 'Got 1 results for CurrentWeatherData'. Others are generated by Kafka itself.

Check Connector Output

Our ultimate goal is to analyse a Kafka Stream with Spark in Scala. However, that will have to wait till my next blog post. For now we can check if the topic is being populated. Please note that our polling interval is 1 minute - the topic will not be flooded with records right away.

$ cd ~/kafka
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic JanisTest --from-beginning

If all is going well, you should see Weather records, generated according to our custom-defined schema:

Quite easy, was it not? Certainly setting up a Kafka connector is easier than developing one.

The next step will be analysing this data from Spark. But that will be my next blog post.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 
More Kafka and Spark, please!

Hello, world!

Having joined Rittman Mead more than 6 years ago, the time has come for my first blog post. Let me start by standing on the shoulders of blogging giants, revisiting Robin's old blog post Getting Started with Spark Streaming, Python, and Kafka.

The blog post was very popular, touching on the subjects of Big Data and Data Streaming. To put my own twist on it, I decided to:

  • not use Twitter as my data source, because there surely must be other interesting data sources out there,
  • use Scala, my favourite programming language, to see how different the experience is from using Python.
Why Scala?

Scala is admittedly more challenging to master than Python. However, because Scala compiles into Java bytecode, it can be used pretty much anywhere where Java is being used. And Java is being used everywhere. Python is arguably even more widely used than Java, however it remains a dynamically typed scripting language that is easy to write in but can be hard to debug.

Is there a case for using Scala instead of Python for the job? Both Spark and Kafka were written in Scala (and Java), hence they should get on like a house on fire, I thought. Well, we are about to find out.

My data source: OpenWeatherMap

When it comes to finding sample data sources for data analysis, the selection out there is amazing. At the time of this writing, Kaggle offers freely available 14,470 datasets, many of them in easy-to-digest formats like CSV and JSON. However, when it comes to real-time sample data streams, the selection is quite limited. Twitter is usually the go-to choice - easily accessible and well documented. Too bad I decided not to use Twitter as my source.

Another alternative is the Wikipedia Recent changes stream. Although in the stream schema there are a few values that would be interesting to analyse, overall this stream is more boring than it sounds - the text changes themselves are not included.

Fortunately, I came across the OpenWeatherMap real-time weather data website. They have a free API tier, which is limited to 1 request per second, which is quite enough for tracking changes in weather. Their different API schemas return plenty of numeric and textual data, all interesting for analysis. The APIs work in a very standard way - first you apply for an API key. With the key you can query the API with a simple HTTP GET request (Apply for your own API key instead of using the sample one - it is easy.):

This request

https://samples.openweathermap.org/data/2.5/weather?q=London,uk&appid=b6907d289e10d714a6e88b30761fae22

gives the following result:

{
  "coord": {"lon":-0.13,"lat":51.51},
  "weather":[
    {"id":300,"main":"Drizzle","description":"light intensity drizzle","icon":"09d"}
  ],
  "base":"stations",
  "main": {"temp":280.32,"pressure":1012,"humidity":81,"temp_min":279.15,"temp_max":281.15},
  "visibility":10000,
  "wind": {"speed":4.1,"deg":80},
  "clouds": {"all":90},
  "dt":1485789600,
  "sys": {"type":1,"id":5091,"message":0.0103,"country":"GB","sunrise":1485762037,"sunset":1485794875},
  "id":2643743,
  "name":"London",
  "cod":200
}
Getting data into Kafka - considering the options

There are several options for getting your data into a Kafka topic. If the data will be produced by your application, you should use the Kafka Producer Java API. You can also develop Kafka Producers in .Net (usually C#), C, C++, Python, Go. The Java API can be used by any programming language that compiles to Java bytecode, including Scala. Moreover, there are Scala wrappers for the Java API: skafka by Evolution Gaming and Scala Kafka Client by cakesolutions.

OpenWeatherMap is not my application and what I need is integration between its API and Kafka. I could cheat and implement a program that would consume OpenWeatherMap's records and produce records for Kafka. The right way of doing that however is by using Kafka Source connectors, for which there is an API: the Connect API. Unlike the Producers, which can be written in many programming languages, for the Connectors I could only find a Java API. I could not find any nice Scala wrappers for it. On the upside, the Confluent's Connector Developer Guide is excellent, rich in detail though not quite a step-by-step cookbook.

However, before we decide to develop our own Kafka connector, we must check for existing connectors. The first place to go is Confluent Hub. There are quite a few connectors there, complete with installation instructions, ranging from connectors for particular environments like Salesforce, SAP, IRC, Twitter to ones integrating with databases like MS SQL, Cassandra. There is also a connector for HDFS and a generic JDBC connector. Is there one for HTTP integration? Looks like we are in luck: there is one! However, this connector turns out to be a Sink connector.

Ah, yes, I should have mentioned - there are two flavours of Kafka Connectors: the Kafka-inbound are called Source Connectors and the Kafka-outbound are Sink Connectors. And the HTTP connector in Confluent Hub is Sink only.

Googling for Kafka HTTP Source Connectors gives few interesting results. The best I could find was Pegerto's Kafka Connect HTTP Source Connector. Contrary to what the repository name suggests, the implementation is quite domain-specific, for extracting Stock prices from particular web sites and has very little error handling. Searching Scaladex for 'Kafka connector' does yield quite a few results but nothing for http. However, there I found Agoda's nice and simple Source JDBC connector (though for a very old version of Kafka), written in Scala. (Do not use this connector for JDBC sources, instead use the one by Confluent.) I can use this as an example to implement my own.

Creating a custom Kafka Source Connector

The best place to start when implementing your own Source Connector is the Confluent Connector Development Guide. The guide uses JDBC as an example. Our source is a HTTP API so early on we must establish if our data source is partitioned, do we need to manage offsets for it and what is the schema going to look like.

Partitions

Is our data source partitioned? A partition is a division of source records that usually depends on the source medium. For example, if we are reading our data from CSV files, we can consider the different CSV files to be a natural partition of our source data. Another example of partitioning could be database tables. But in both cases the best partitioning approach depends on the data being gathered and its usage. In our case, there is only one API URL and we are only ever requesting current data. If we were to query weather data for different cities, that would be a very good partitioning - by city. Partitioning would allow us to parallelise the Connector data gathering - each partition would be processed by a separate task. To make my life easier, I am going to have only one partition.

Offsets

Offsets are for keeping track of the records already read and the records yet to be read. An example of that is reading the data from a file that is continuously being appended - there can be rows already inserted into a Kafka topic and we do not
want to process them again to avoid duplication. Why would that be a problem? Surely, when going through a source file row by row, we know which row we are looking at. Anything above the current row is processed, anything below - new records. Unfortunately, most of the time it is not as simple as that: first of all Kafka supports concurrency, meaning there can be more than one Task busy processing Source records. Another consideration is resilience - if a Kafka Task process fails,
another process will be started up to continue the job. This can be an important consideration when developing a Kafka Source Connector.

Is it relevant for our HTTP API connector? We are only ever requesting current weather data. If our process fails, we may miss some time periods but we cannot recover then later on. Offset management is not required for our simple connector.

So that is Partitions and Offsets dealt with. Can we make our lives just a bit more difficult? Fortunately, we can. We can create a custom Schema and then parse the source data to populate a Schema-based Structure. But we will come to that later.
First let us establish the Framework for our Source Connector.

Source Connector - the Framework

The starting point for our Source Connector are two Java API classes: SourceConnector and SourceTask. We will put them into separate .scala source files but they are shown here together:

import org.apache.kafka.connect.source.{SourceConnector, SourceTask}

class HttpSourceConnector extends SourceConnector {...}
class HttpSourceTask extends SourceTask {...}

These two classes will be the basis for our Source Connector implementation:

  • HttpSourceConnector represents the Connector process management. Each Connector process will have only one SourceConnector instance.
  • HttpSourceTask represents the Kafka task doing the actual data integration work. There can be one or many Tasks active for an active SourceConnector instance.

We will have some additional classes for config and for HTTP access.
But first let us look at each of the two classes in more detail.

SourceConnector class

SourceConnector is an abstract class that defines an interface that our HttpSourceConnector needs to adhere to. The first function we need to override is config:

  private val configDef: ConfigDef =
      new ConfigDef()
          .define(HttpSourceConnectorConstants.HTTP_URL_CONFIG, Type.STRING, Importance.HIGH, "Web API Access URL")
          .define(HttpSourceConnectorConstants.API_KEY_CONFIG, Type.STRING, Importance.HIGH, "Web API Access Key")
          .define(HttpSourceConnectorConstants.API_PARAMS_CONFIG, Type.STRING, Importance.HIGH, "Web API additional config parameters")
          .define(HttpSourceConnectorConstants.SERVICE_CONFIG, Type.STRING, Importance.HIGH, "Kafka Service name")
          .define(HttpSourceConnectorConstants.TOPIC_CONFIG, Type.STRING, Importance.HIGH, "Kafka Topic name")
          .define(HttpSourceConnectorConstants.POLL_INTERVAL_MS_CONFIG, Type.STRING, Importance.HIGH, "Polling interval in milliseconds")
          .define(HttpSourceConnectorConstants.TASKS_MAX_CONFIG, Type.INT, Importance.HIGH, "Kafka Connector Max Tasks")
          .define(HttpSourceConnectorConstants.CONNECTOR_CLASS, Type.STRING, Importance.HIGH, "Kafka Connector Class Name (full class path)")

  override def config: ConfigDef = configDef

This is validation for all the required configuration parameters. We also provide a description for each configuration parameter, that will be shown in the missing configuration error message.

HttpSourceConnectorConstants is an object where config parameter names are defined - these configuration parameters must be provided in the connector configuration file:

object HttpSourceConnectorConstants {
  val HTTP_URL_CONFIG               = "http.url"
  val API_KEY_CONFIG                = "http.api.key"
  val API_PARAMS_CONFIG             = "http.api.params"
  val SERVICE_CONFIG                = "service.name"
  val TOPIC_CONFIG                  = "topic"
  val TASKS_MAX_CONFIG              = "tasks.max"
  val CONNECTOR_CLASS               = "connector.class"

  val POLL_INTERVAL_MS_CONFIG       = "poll.interval.ms"
  val POLL_INTERVAL_MS_DEFAULT      = "5000"
}

Another simple function to be overridden is taskClass - for the SourceConnector class to know its corresponding SourceTask class.

  override def taskClass(): Class[_ <: SourceTask] = classOf[HttpSourceTask]

The last two functions to be overridden here are start and stop. These are called upon the creation and termination of a SourceConnector instance (not Task instance). JavaMap here is an alias for java.util.Map - a Java Map, which is not to be confused with the native Scala Map - that cannot be used here. (If you are a Python developer, a Map in Java/Scala is similar to the Python dictionary, but strongly typed.) The interface requires Java data structures, but that is fine - we can convert them from one to another. By far the biggest problem here is the assignment of the connectorConfig variable - we cannot have a functional programming friendly immutable value here. The variable is defined at the class level

  private var connectorConfig: HttpSourceConnectorConfig = _

and is set in the start function and then referred to in the taskConfigs function further down. This does not look pretty in Scala. Hopefully somebody will write a Scala wrapper for this interface.

Because there is no logout/shutdown/sign-out required for the HTTP API, the stop function just writes a log message.

  override def start(connectorProperties: JavaMap[String, String]): Unit = {
    Try (new HttpSourceConnectorConfig(connectorProperties.asScala.toMap)) match {
      case Success(cfg) => connectorConfig = cfg
      case Failure(err) => connectorLogger.error(s"Could not start Kafka Source Connector ${this.getClass.getName} due to error in configuration.", new ConnectException(err))
    }
  }

  override def stop(): Unit = {
    connectorLogger.info(s"Stopping Kafka Source Connector ${this.getClass.getName}.")
  }

HttpSourceConnectorConfig is a thin wrapper class for the configuration.

We are almost done here. The last function to be overridden is taskConfigs.
This function is in charge of producing (potentially different) configurations for different Source Tasks. In our case, there is no reason for the Source Task configurations to differ. In fact, our HTTP API will benefit little from parallelism, so, to keep things simple, we can assume the number of tasks always to be 1.

  override def taskConfigs(maxTasks: Int): JavaList[JavaMap[String, String]] = List(connectorConfig.connectorProperties.asJava).asJava

The name of the taskConfigs function was changed in the Kafka version 2.1.0 - please consider that when using this code for older Kafka versions.

Source Task class

In a similar manner to the Source Connector class, we implement the Source Task abstract class. It is only slightly more complex than the Connector class.

Just like for the Connector, there are start and stop functions to be overridden for the Task.

Remember the taskConfigs function from above? This is where task configuration ends up - it is passed to the Task's start function. Also, similarly to the Connector's start function, we parse the connection properties with HttpSourceTaskConfig, which is the same as HttpSourceConnectorConfig - configuration for Connector and Task in our case is the same.

We also set up the Http service that we are going to use in the poll function - we create an instance of the WeatherHttpService class. (Please note that start is executed only once, upon the creation of the task and not every time a record is polled from the data source.)

  override def start(connectorProperties: JavaMap[String, String]): Unit = {
    Try(new HttpSourceTaskConfig(connectorProperties.asScala.toMap)) match {
      case Success(cfg) => taskConfig = cfg
      case Failure(err) => taskLogger.error(s"Could not start Task ${this.getClass.getName} due to error in configuration.", new ConnectException(err))
    }

    val apiHttpUrl: String = taskConfig.getApiHttpUrl
    val apiKey: String = taskConfig.getApiKey
    val apiParams: Map[String, String] = taskConfig.getApiParams

    val pollInterval: Long = taskConfig.getPollInterval

    taskLogger.info(s"Setting up an HTTP service for ${apiHttpUrl}...")
    Try( new WeatherHttpService(taskConfig.getTopic, taskConfig.getService, apiHttpUrl, apiKey, apiParams) ) match {
      case Success(service) =>  sourceService = service
      case Failure(error) =>    taskLogger.error(s"Could not establish an HTTP service to ${apiHttpUrl}")
                                throw error
    }

    taskLogger.info(s"Starting to fetch from ${apiHttpUrl} each ${pollInterval}ms...")
    running = new JavaBoolean(true)
  }

The Task also has the stop function. But, just like for the Connector, it does not do much, because there is no need to sign out from an HTTP API session.

Now let us see how we get the data from our HTTP API - by overriding the poll function.

The fetchRecords function uses the sourceService HTTP service initialised in the start function. sourceService's sourceRecords function requests data from the HTTP API.

  override def poll(): JavaList[SourceRecord] = this.synchronized { if(running.get) fetchRecords else null }

  private def fetchRecords: JavaList[SourceRecord] = {
    taskLogger.debug("Polling new data...")

    val pollInterval = taskConfig.getPollInterval
    val startTime    = System.currentTimeMillis

    val fetchedRecords: Seq[SourceRecord] = Try(sourceService.sourceRecords) match {
      case Success(records)                    => if(records.isEmpty) taskLogger.info(s"No data from ${taskConfig.getService}")
                                                  else taskLogger.info(s"Got ${records.size} results for ${taskConfig.getService}")
                                                  records

      case Failure(error: Throwable)           => taskLogger.error(s"Failed to fetch data for ${taskConfig.getService}: ", error)
                                                  Seq.empty[SourceRecord]
    }

    val endTime     = System.currentTimeMillis
    val elapsedTime = endTime - startTime

    if(elapsedTime < pollInterval) Thread.sleep(pollInterval - elapsedTime)

    fetchedRecords.asJava
  }

Phew - that is the interface implementation done. Now for the fun part...

Requesting data from OpenWeatherMap's API

The fun part is rather straightforward. We use the scalaj.http library to issue a very simple HTTP request and get a response.

Our WeatherHttpService implementation will have two functions:

  • httpServiceResponse that will format the request and get data from the API
  • sourceRecords that will parse the Schema and wrap the result within the Kafka SourceRecord class.

Please note that error handling takes place in the fetchRecords function above.

    override def sourceRecords: Seq[SourceRecord] = {
        val weatherResult: HttpResponse[String] = httpServiceResponse
        logger.info(s"Http return code: ${weatherResult.code}")
        val record: Struct = schemaParser.output(weatherResult.body)

        List(
            new SourceRecord(
                Map(HttpSourceConnectorConstants.SERVICE_CONFIG -> serviceName).asJava, // partition
                Map("offset" -> "n/a").asJava, // offset
                topic,
                schemaParser.schema,
                record
            )
        )
    }

    private def httpServiceResponse: HttpResponse[String] = {

        @tailrec
        def addRequestParam(accu: HttpRequest, paramsToAdd: List[(String, String)]): HttpRequest = paramsToAdd match {
            case (paramKey,paramVal) :: rest => addRequestParam(accu.param(paramKey, paramVal), rest)
            case Nil => accu
        }

        val baseRequest = Http(apiBaseUrl).param("APPID",apiKey)
        val request = addRequestParam(baseRequest, apiParams.toList)

        request.asString
    }
Parsing the Schema

Now the last piece of the puzzle - our Schema parsing class.

The short version of it, which would do just fine, is just 2 lines of class (actually - object) body:

object StringSchemaParser extends KafkaSchemaParser[String, String] {
    override val schema: Schema = Schema.STRING_SCHEMA
    override def output(inputString: String) = inputString
}

Here we say we just want to use the pre-defined STRING_SCHEMA value as our schema definition. And pass inputString straight to the output, without any alteration.

Looks too easy, does it not? Schema parsing could be a big part of Source Connector implementation. Let us implement a proper schema parser. Make sure you read the Confluent Developer Guide first.

Our schema parser will be encapsulated into the WeatherSchemaParser object. KafkaSchemaParser is a trait with two type parameters - inbound and outbound data type. This indicates that the Parser receives data in String format and the result is a Kafka's Struct value.

object WeatherSchemaParser extends KafkaSchemaParser[String, Struct]

The first step is to create a schema value with the SchemaBuilder. Our schema is rather large, therefore I will skip most fields. The field names given are a reflection of the hierarchy structure in the source JSON. What we are aiming for is a flat, table-like structure - a likely Schema creation scenario.

For JSON parsing we will be using the Scala Circle library, which in turn is based on the Scala Cats library. (If you are a Python developer, you will see that Scala JSON parsing is a bit more involved (this might be an understatement), but, on the flipside, you can be sure about the result you are getting out of it.)

    override val schema: Schema = SchemaBuilder.struct().name("weatherSchema")
        .field("coord-lon", Schema.FLOAT64_SCHEMA)
        .field("coord-lat", Schema.FLOAT64_SCHEMA)

        .field("weather-id", Schema.FLOAT64_SCHEMA)
        .field("weather-main", Schema.STRING_SCHEMA)
        .field("weather-description", Schema.STRING_SCHEMA)
        .field("weather-icon", Schema.STRING_SCHEMA)
        
        // ...
        
        .field("rain", Schema.FLOAT64_SCHEMA)
        
        // ...

Next we define case classes, into which we will be parsing the..

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Separate tags by commas
To access this feature, please upgrade your account.
Start your free month
Free Preview