Loading...

Kafka Connect is an API and ecosystem of 3rd party connectors that enables Kafka to be easily integrated with other heterogeneous systems without having to write any extra code. This blog focuses on a use case extending the Kongo IoT application to stream events from Kafka to Apache Cassandra using a Kafka Connect Cassandra Sink.

For an introduction to Kafka Connect see Apache Kafka Connect Architecture Overview.

Here’s the Kongo code and sample connect property files for this blog.

1. The Problem: Streaming Kongo Events to Cassandra

In the previous blog (Apache Kafka “Kongo” Part 3: Kafkafying Kongo – Serialization, One or Many topics, Event Order Matters) we started the “Kafkification” process of the Kongo IoT application by adding some initial Kafka producers, topics and consumers for Sensor and RFID events. This enabled the rules to be checked for sensor events (for Goods in warehouses and trucks), and for RFID load events (to check that Goods are allowed to be transported on the same truck). However, we didn’t do anything as a result of these checks.

Imagine that we have a business requirement to keep track of every violation during the storage and transportation of Goods (for real-time notification), and then when Goods are eventually delivered (and potentially for arbitrary an arbitrary period after delivery) check for violations for each delivered Goods (for auditing, quality control, process improvement etc).

Because Kafka uses an immutable log store we could potentially do all this in Kafka “for free”, with appropriate retention policy settings. Nevertheless, let’s assume we need to persist the violation events in some other system. In practice Kafka may have to be integrated with existing enterprise systems anyway (particularly if it’s being used an integration platform!), so this use case gives us an excuse to investigate how events can be sent from Kafka to another application.

To visualise the high level event flow in the Kongo application we’ll use a Sankey Diagram. Sankey Diagrams visualise flows in systems (they have been used for everything from steam engines to Netflix). You read them from left to right and the vertical dimension is proportional to quantity. The following diagram shows relative event flows in Kongo assuming we add new Kafka producers for Sensor and RFIDLoad Checks that feed to a single new Kafka violations topic, and assumes that every check results in a violation event:

Sankey Diagram built online with http://sankeymatic.com/build/

Notice that the number of output events is much higher than the number of input events. On average, every input event can produce up to 100 output events, and exhibits event “amplification” – for every input event there can potentially be an avalanche of new events produced.  Both the event system (i.e. Apache Kafka) and any external Sink need to be highly scalable. Luckily as Kafka automatically provides buffering, the external system only has to keep up with the average flow rate rather than peaks. Apache Cassandra is designed for high write rates and is linearly scalable so it’s a good choice as a Sink.

2. The Solution: Kafka Connect

Having decided to write events from Kafka topics to Cassandra, how do we do it?  Well, we could write a custom Kafka consumer that reads events from a topic and writes them to Cassandra. Or we could use the part of the Kafka architecture that is specifically designed for scalable and reliable movement of data between Kafka and third party systems, Kafka Connect. Kafka connect featured (immediately below the kafka logo) in the 1st Kafka blog:

Pick‘n’Mix: Cassandra, Spark, Zeppelin, Elassandra, Kibana, & Kafka

What’s significant to realise is that Apache Kafka provides the Kafka Connect API, but it only comes with one basic example file system connector. Useful connectors are provided by 3rd parties to connect to specific systems, or you can write your own connector.  So if you want to connect Kafka to system X (say Cassandra), you have to find or write connectors for that specific system, and in the direction you want to the data to go: “Source connectors” pull data from an external system (the Source) and write it to Kafka topics; “Sink connectors” read data from Kafka topics and push it to an external system (the Sink). Each connector flavour is unidirectional, you can’t go against the flow. Here’s an even simpler diagram showing the high-level Kafka Connect architecture with Source (green) and Sink (blue) data flows:

High Level Kafka Connect Architecture showing Source and Sink Flows

For an introduction to the Kafka Connect Architecture (covering Source and Sink Connectors; Connectors, Plugins, Tasks and Workers; Clusters; and Converters) please see the related blog “Apache Kafka Connect Architecture Overview”.

3. Connecting Kongo to Cassandra

How did we get Kongo working with Kafka connect? The steps taken included: Selecting a Kafka Cassandra Connector; getting the simple Kafka File Sink connector working (standalone); adding a Violations topic to Kongo; connecting Kongo to the file Sink connector; connecting Kongo to the Kafka Cassandra connector (standalone); production deployment with distributed workers.

3.1 We’re going on a Kafka Cassandra Connector Hunt!

One of the challenges with understanding and using Kafka Connect is that only the Connect API is provided by Apache Kafka, and the Apache documentation is therefore limited to the API. Doing a quick Google search it seems that there were at least a few candidates for a Kafka Cassandra Connector. This was good otherwise the rest of the blog would have been about writing one from scratch. What was I looking for in a connector? Not much really!? Here’s my wishlist (yours may be different):

  • A Kafka Cassandra Sink Connector (as we want to write data to Cassandra)
  • Open Source, with an Apache License
  • Works with recent versions of Apache Kafka (1.1.0) and Cassandra (3.11)
  • Shows some recent maintenance activity
  • Works with the minimal amount of extra software
  • Has reasonable documentation
  • Has at least one external article describing or reviewing it
  • Has some documented examples
  • and is maybe certified for Kafka, Cassandra or both (is that a thing?)

To cut a longish story short, we settled on a connector from Landoop, now part of their Lenses platform (see all the connectors that were found in the resources section at the end).

This may seem like an odd choice, as even though it is open source, in theory, it requires you to run both their Lenses platform and a Schema Registry service. However,  after a bit of research, I had a suspicion that it was possible to run the Landoop Kafka Cassandra connector with only the default Apache Kafka connect functionality. See How to use Kafka Connect for Cassandra without Confluent, Kafka Connect – Import Export for Apache Kafka, and “To install Kafka Connect outside of the Lenses Development Docker follow the instructions from Kafka.”

What do you need to install? Assuming we have Kafka and Cassandra installed, you also need to download the Kafka Cassandra Connector and download Stream Reactor (for the connector property files).

However, before jumping headfirst into the Kafka Cassandra Sink connector let’s dip our toes in the water with the Apache Kafka built-in example file connector to get some experience with configuring a connector, modifying Kongo and getting them to work together.

3.2 Kafka File Connector

The Kafka file connector is simple, comes with Apache Kafka, and you can test it out just using the Kafka command line tools.  I followed the instructions for the File Sink Connector here. The File Sink Connector will simply read records from a Kafka topic and append them to a file.

If you have Apache Kafka installed and running that’s all you’ll need to try it out.  You need to find two property files, both are located in config. The property file for the single (standalone) worker is called connect-standalone.properties, and the connector-specific property file is called connect-file-sink.properties.   Copy them and make some changes.  The connector property file is the simplest and has lines to: set the connector name, the connector class, the maximum number of tasks (1 in this case), the name of the file that will act as the sink (where records will be appended), and the name of the Kafka topic that records will be read from:

name=test-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/test-sink.txt
topics=connect-test

The worker property file is longer.  It has settings for the Kafka bootstrap servers (which you will need to set if you are running a real Kafka cluster) and Converters. The file only needs two changes from the defaults for this example. Change the default key and value converters from json.JsonConvert to storage.StringConverter:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

To start Kafka connect with the worker and sink property files, run connect-standalone from the command line, with the two property files as the arguments. This starts a Kafka connect worker which in turn starts a connector and a task:

> bin/connect-standalone.sh connect-standalone.properties connect-file-sink.properties

This produces lots of useful info messages and eventually confirmation that a connector and a worker sink task have been started. From now on the worker will be continuously polling the connect-test topic for records, and passing them to the file sink task which will append them to the /tmp/test-sink.txt file. You can check that it’s working by writing some random data to the topic using the console producer like this:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic connect-test

>one two three

>I live up a tree

And the file now contains the lines:

one two three

I live up a tree

What if you don’t change the key and value converters and instead leave them as the JSON defaults? Then the worker throws an exception and dies. Why is this? It is expecting JSON formatted data on the Kafka topic. To try JSON data, change the key and value schemas to false:

key.converter.schemas.enable=false

value.converter.schemas.enable=false

You can then send data to the topic which the file sink connector can interpret as JSON:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic connect-test

>{"message":"one two three"}

>{"message":"I live up a tree"}

And the file contains the new lines:

{message=one two three}

{message=I live up a tree}

Note that for this example we have not used a Kafka topic key (just a value).

3.3 Adding a Violations Topic to Kongo

The Kongo code was modified to write violation messages to a violation topic in preparation for connecting the topic to Cassandra with a sink connector. As with previous topics we needed classes for the event type and serialization: ViolationEvent and ViolationEventSerializer. I initially modelled these on the RFIDEventSerializer which formatted multiple event fields as comma separated Strings. Unlike the other event types we don’t need a ViolationEventConsumer as we are only writing data to the topics, not consuming it in the application itself.

The main changes were in Goods. I added a new KafkaProducer for ViolationEvent. Recall that there are two sorts of violations in Kongo. The first is if a Goods in a location (warehouse or truck) has any rules violated due to a sensor event value being out of range. If a violation is detected then a new ViolationEvent is created and sent to the topic, with a key being the Goods Id, and the value being a String with the violation category (“sensor”) and details of all the rules that were violated and why.

The second violation event may occur as a result of a RFID load event If a Goods is loaded onto a truck and there is a co-location violation with any of the Goods already loaded (a conflict of categories allowed to be transported together). Again a ViolationEvent is created and sent, with the key equal to the Goods loaded, a violation category of “location” and a String containing the details.

Due to the new use case requiring notifications to be produced and written to Cassandra, for every Goods violation during storage and transport, I noticed that the original version didn’t check or produce warnings for the Goods already loaded on trucks. As the Sankey diagrams in the introduction revealed, a side-effect of this is that there may be a large number of violation events produced for a single RFID load event.

3.4 Connecting Kongo to a File Sink

We can now ramp up our experiments and see if we can get the Kafka File Sink connector working with the new Kongo violations topic to write violations to a file. To do this, just change the topic name in the connect-file-sink.properties file to the new violations topic, and check that the converters are set to StringConverter in connect-standalone.properties:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

Start the connect worker as above and then start the Kongo application (from KafkaRun). Check the contents of the sink file and you’ll see the violation messages.

3.5 Connecting Kongo to a Cassandra Sink

Now let’s try and write the violations events to Cassandra.  What do you need to do? Get the connector and get the property files. Modify the property files. Run the connector.  

The most important links are as follows (see resources below for more):

Copy/move the kafka-connect-cassandra jar file to libs, and the connector property file, cassandra-sink.properties, to config.

One of the major issues to consider is what data format to use, as there seem to be a number of options, and some of them require a Schema Registry service. The simplest approach seemed to be JSON. See the No Schema and a JSON payload instructions for setting the converters in the worker property file (just use a copy of the one used for the file connector worker).  

To try this out without an existing Cassandra cluster running, you can set up a local database with these instructions. The Cassandra connector Sink expects a keyspace and table to exist to write the data into so you can create them first with the cqlsh command line:

CREATE KEYSPACE kongo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

CREATE TABLE kongo.violations (

goods text,

error text,

PRIMARY KEY (goods, error)

)

Table design is important in Cassandra, and you need to think about what queries you want to use. We assume that for this use case we will want to query if a particular Goods Id has any violations, and find all violations for a Goods Id. This means that the partition key will be the Goods Id. But given that there can be multiple violations for a Goods during its storage and transportation lifetime, we need another key as well, making a compound primary key. The additional column is the clustering key and we use the complete error String message.  This allows a query like this:

select error from violations where goods=’GoodsId’;

Which will return all the errors for GoodsId.

Now configure the cassandra-sink.properties file with Connector class, topic, and Cassandra settings:

connect.progress.enabled=true

name=cassandra-sink-orders

connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector

tasks.max=1

topics=violations-topic

connect.cassandra.kcql=INSERT INTO violations SELECT goods, error from violations-topic

connect.cassandra.contact.points=localhost

connect.cassandra.port=9042

connect.cassandra.key.space=kongo

connect.cassandra.username=me

connect.cassandra.password=nottelling

Note the kcql line. KCQL stands for Kafka Connect Query Language (not to be confused with KSQL which was designed for Kafka stream processing). This tells the connector how to map the Kafka topic data to the Cassandra table data. The topic key is not used by the connector, just the topic value. The KCQL above tells the connector to read data from violations-topic, parse the value and find the goods and error fields, and then write the goods and error values to the goods and error columns of the Cassandra table, called violations in the kongo keyspace.  This requires that the..

Read Full Article
Visit website
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Motivation

When examining whether Cassandra is a good fit for your needs, it is good practice to stress test Cassandra using a workload that looks similar to the expected workload in Production.

In the past we have examined the richness of features using YAML profiles in Cassandra’s stress tool – if you haven’t seen the previous post or are unfamiliar with YAML profiles in Cassandra stress, I’d recommend checking it out now.

YAML profiles are all fine and dandy when it comes to mixed or general workloads using SizeTieredCompactionStrategy (STCS) or LeveledCompactionStrategy (LCS), but sometimes we may want to model a time series workload using TimeWindowCompactionStrategy (TWCS). How would we do that with the current options available to us in stress? Ideally, we would be able to do such a thing without having to schedule cassandra-stress instances every X minutes.

Native functions

As it turns out, Cassandra has a native function now() that returns the current time as a timeuuid, which is a unique representation of time. Cassandra also ships with the function toTimestamp() that accepts a timeuuid. Putting the two together, we are able to obtain the following result:

So we can use that to our advantage in a YAML profile:

table_definition: |

CREATE TABLE twcstest (

id text,

time timestamp,

metric int,

value blob,

PRIMARY KEY((id), time)

) WITH CLUSTERING ORDER BY (time DESC)

AND compaction = { 'class':'TimeWindowCompactionStrategy', 'compaction_window_unit':'MINUTES', 'compaction_window_size':'20' }

AND comment='A table to see what happens with TWCS & Stress'


columnspec:

- name: id

  size: fixed(64)

  population: uniform(1..1500M)

- name: time

  cluster: fixed(288)

- name: value

  size: fixed(50)


queries:

putindata:

  cql: insert into twcstest (id, time, metric, value) VALUES (?, toTimestamp(now()), ?, ?)

Based on that YAML above, we can now insert time series data as part of our stress. Additionally, please be aware that the compaction_window_unit property has been deliberately kept much smaller than is typical of a normal production compaction strategy!

cassandra-stress user profile=stressspectwcs.yaml n=50000000 cl=QUORUM ops\(putindata=1\) -node file=nodelist.txt -rate threads=500 -log file=insert.log -pop seq=1..1500M

The only snag to be aware of is that stress will insert timestamps rapidly, so you may want to tweak the values a little to generate suitably sized partitions with respect to your production workload.

That’s great, now how do I select data?

Well, intuitively we would just make use of the same helpful native functions that got us out from the tight spot before. So we may try this:

queries:

putindata:

  cql: insert into twcstest (id, time, metric, value) VALUES (?, toTimestamp(now()), ?, ?)

simple1:

  cql: select * from twcstest where id = ? and time <= toTimestamp(now()) and time >= hmmm….

We appear to be a little stuck because selects may not be as straightforward as we had expected.

  1. We could try qualifying with just <=, but then that would be a whole lot of data we select (You aren’t going to do this in Production, are you?), unless id is bucketed…but it isn’t in our situation.
  2. We could try qualifying with just >=, but then nothing will be returned (You aren’t testing a case like this either, surely).

Unfortunately for us, it doesn’t look like Cassandra has anything available to help us out here natively. But it certainly has something we can leverage.

UDFs for the win

User defined functions (UDFs) have been added to Cassandra since 2.2. If you aren’t familiar with them, there are examples of them available in a previous blog post and the official cassandra documentation. Since Cassandra doesn’t have any other native functions to help us, we can just write our own UDF, as it should be.

Typically we may expect to want to select a slice up to a certain number of minutes ago. So we want to write a UDF to allow us to do that.

CREATE OR REPLACE FUNCTION stresscql2.minutesAgo ( arg int )

  RETURNS NULL ON NULL INPUT

  RETURNS bigint

  LANGUAGE java

  AS $$

  return (System.currentTimeMillis() - arg * 60 * 1000); $$;

This UDF is quite self explanatory so I won’t go into too much detail. Needless to say, it returns a bigint of arg minutes ago.

Here is a test to illustrate just to be safe:

Here is our new and improved YAML profile:

simple1:

cql: select * from twcstest where id = ? and time <= toTimestamp(now()) and time >= minutesAgo(5)          

Now, when we execute cassandra-stress with simple1, we can expect just data within a certain time frame instead of selecting the whole partition. We can also keep varying the query to select older data if we like, for example, time >= minutesAgo(600) and time <= minutesAgo(590) for data up to 10 hours ago.

A variation with bucketing

We can also create UDFs that model bucketing behaviour. For example, suppose now we have a schema that has data bucketed, like this:

CREATE TABLE twcstestbucket (

id text,

bucket timestamp,

time timestamp,

metric int,

value blob,

PRIMARY KEY((id, bucket), time)

) WITH CLUSTERING ORDER BY (time DESC)

AND compaction = { 'class':'TimeWindowCompactionStrategy', 'compaction_window_unit':'MINUTES', 'compaction_window_size':'20' }

AND comment='A table to see what happens with TWCS & Stress'

And we want to be able to insert data in 5 minute buckets. We can create UDFs like so:

CREATE OR REPLACE FUNCTION stresscql2.nowInMilliSec()

  RETURNS NULL ON NULL INPUT

  RETURNS bigint

  LANGUAGE java

  AS $$

  return (System.currentTimeMillis()); $$;

CREATE OR REPLACE FUNCTION stresscql2.bucket( arg bigint )

  RETURNS NULL ON NULL INPUT

  RETURNS bigint

  LANGUAGE java

  AS $$

   java.time.ZonedDateTime time = java.time.ZonedDateTime.ofInstant(java.time.Instant.ofEpochMilli(arg),     java.time.ZoneOffset.UTC);

   java.time.ZonedDateTime lastFiveMinutes = time.truncatedTo(java.time.temporal.ChronoUnit.HOURS)

                  .plusMinutes(5 * (time.getMinute()/5));

   return (lastFiveMinutes.toEpochSecond() * 1000);

   $$;

CREATE OR REPLACE FUNCTION stresscql2.randomBucket(lowerbound bigint, upperbound bigint)

  RETURNS NULL ON NULL INPUT

  RETURNS bigint

  LANGUAGE java

  AS $$

  java.time.ZonedDateTime lower = java.time.ZonedDateTime.ofInstant(java.time.Instant.ofEpochMilli(lowerbound), java.time.ZoneOffset.UTC);

  java.util.Random random = new java.util.Random();

  int numberOfBuckets = (int) (upperbound - lowerbound) / (5 * 60 * 1000);

  int targetBucket = random.nextInt(numberOfBuckets);

  return (lower.truncatedTo(java.time.temporal.ChronoUnit.HOURS).plusMinutes(5 * (lower.getMinute()/5)).plusMinutes(5 * targetBucket).toEpochSecond() * 1000);

  $$;

The UDF bucket is quite self explanatory as well – it just returns the nearest 5 minute bucket smaller than arg. This assumes UTC time and 5 minute buckets, but the code can easily be tailored to be more general.

However, our UDF doesn’t understand timeuuid. Which is why we need another helper function, which is the function nowInMilliSec().

The final UDF generates a random bucket based on a lower and upper bound time. The expected input bounds should be in epoch milliseconds. This will help in selecting old/random data bucketed to within 5 minutes in a range.

And now here is our new and modified YAML profile to accommodate our desires of having stress follow a bucketed workload:

putindata:

cql: insert into twcstestbucket (id, bucket, time, metric, value) VALUES (?, bucket(nowInMilliSec()), toTimestamp(now()), ?, ?)

simple1:

cql: select * from twcstestbucket where id = ? and bucket = bucket(nowInMilliSec()) and time <= toTimestamp(now()) and time >= minutesAgo(5)

fields: samerow            

selectold:

cql: select * from twcstestbucket where id = ? and bucket = randomBucket(1524115200000, 1524129600000)

1524117600000 happens to be Thursday, April 19, 2018 5:20:00 AM in GMT time while 1524129600000 happens to be Thursday, April 19, 2018 9:20:00 AM. It can be tailored to suit needs. It’s kind of ugly, but it will do the job.

And there we go: Tap into UDFs to be able to model a TWCS workload with Cassandra stress.

cassandra-stress user profile=stressspectwcs.yaml n=50000000 cl=QUORUM ops\(putindata=5,simple1=5,selectold=1\) -node file=nodelist.txt -rate threads=500 -log file=mixed.log -pop seq=1..1500M

There’s always an option of writing your own client and using that to perform stress instead, with the obvious benefit that there’s no need to write UDFs and you have control over everything. The downside is that you would have to write code that includes rate limiting and reporting of metrics whereas cassandra stress is the stressing tool that comes with Cassandra out of the box and has very rich statistics, down to latency for each query.

The post Using Cassandra Stress to model a time series workload appeared first on Instaclustr.

Read Full Article
Visit website
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Kafka Connect is an API and ecosystem of 3rd party connectors that enables Apache Kafka to be scalable, reliable, and easily integrated with other heterogeneous systems (such as Cassandra, Spark, and Elassandra) without having to write any extra code. This blog is an overview of the main Kafka Connect components and their relationships. We’ll cover Source and Sink Connectors; Connectors, Plugins, Tasks and Workers; Clusters; and Converters.

For an example of how to use Kafka Connect see Apache Kafka “Kongo” Part 4.1 and 4.2: Connecting Kafka to Cassandra with Kafka Connect.

1. Source and Sink Connectors

At a high level, “Source connectors” pull data from an external system (the Source) and write it to Kafka topics. “Sink connectors” read data from Kafka topics and push it to an external system (the Sink). Each connector flavour is unidirectional, you can’t go against the flow. Here’s a simple diagram showing the high level Kafka Connect architecture with Source (green) and Sink (blue) data flows:

High Level Kafka Connect Architecture showing Source and Sink Flows

2. Connectors, Plugins, Tasks & Workers

There are three main components to the Kafka Connect API, each with a different role: Connectors, Tasks and Workers.

Connectors are either Source or Sink Connectors, and are responsible for a some of the Task management, but not the actual data movement.

Tasks come in two corresponding flavours as well, Source and Sink Tasks. A Source Task will contain custom code to get data from the Source system (in the pull() method) and uses a Kafka producer which sends the data to Kafka topics. A Sink Task uses a Kafka consumer to poll Kafka topics and read data, and custom code to push data to the Sink system (in the put() method). Each Sink Task has a thread, and they belong to the same consumer group for load balancing.

The components work together like this (with inspiration from “Kafka: The Definitive Guide”):

Connector “Plugins” (Collections of Connectors and Tasks)

A Connector Plugin is a collection of Connectors and Tasks deployed to each Worker.

Connector

Connectors are responsible for the number of tasks, splitting work between tasks, getting configurations for the tasks from the workers and passing it to the Tasks. E.g. to decide how many tasks to run for a Sink, a Connector could use the minimum of max.tasks set in the configuration and the number of partitions of the Kafka topic it is reading from). The workers actually start the Tasks.

Tasks

Tasks are responsible for getting data into and out of Kafka (but only on the Source or Sink side, the Workers manage data flow to/from Kafka topics). Once started, Source Tasks poll Source systems and get the data that the Workers send to Kafka topics, and Sink Tasks get records from Kafka via the Worker, and write the records to the Sink system.

Workers

Workers are the processes that execute the Connectors and Tasks. They handle the REST requests that define connectors and configurations, start the connectors and tasks and pass configurations to them. If using distributed workers, and a worker process dies, then the connectors and tasks associated with the failed worked will be taken over and load balanced among the remaining workers.

3. Kafka Connect Clusters

A Kafka Connect Cluster has one (standalone) or more (distributed) Workers running on one or multiple servers, and the Workers manage Connectors and Tasks, distributing work among the available Worker processes. Note that Kafka Connect does not automatically handle restarting or scaling of Workers, so this must be handled with some other solution.

The following diagram shows the main relationships and functions of each component in a connect cluster. A Kafka connect cluster can be run on one or more servers (for production these will be separate to the servers that the Kafka Brokers are running on), and one (but potentially more) workers on each server. Data movement is shown with green lines:

Apache Kafka Connect Architecture UML Diagram

4. Kafka Connect Converters

Just like Catalytic Converters for cars, converters are also a key part of the Kafka connector pipeline! I initially found converters perplexing as Kafka consumers and producers already have (De-)Serializers. Are converters the same or different?  Kafka doesn’t know anything about the data format of topic keys and value, it just treats them as byte arrays. So consumers and producers need to be able to convert objects to and from byte arrays, and that’s exactly what the (De-)Serializers do.

Doing some more research on Converters I found that the converter interface docs say:

“The Converter interface provides support for translating between Kafka Connect’s runtime data format and byte. Internally, this likely includes an intermediate step to the format used by the serialization layer.”

I also found that Converter has fromConnectData() and toConnectData() method that must be implemented for converting byte arrays to/from Kafka Connect Data Objects.  Connect “Data Objects” have schemas and values, and a SchemaBuilder which provides a fluent API for constructing Schema objects. Schemas are optional to support cases with schema-free data. ConnectRecords (subclasses SinkRecord and SourceRecord) are analogous to Kafka’s ConsumerRecord and ProducerRecord classes, and contain the data read from or written to Kafka.

In conclusion, here’s how Sources, Tasks, Converters, Topics, (De-)Serializers and Sinks fit together to give a complete end-to-end Kafka data pipeline:

Complete end-to-end Kafka Data Pipeline

Finally, one nice feature of the Kafka Connect architecture is that because converters are decoupled from connectors, you can reuse any Kafka Connect Converter with any Kafka Connect Connector.

The post Apache Kafka Connect Architecture Overview appeared first on Instaclustr.

Read Full Article
Visit website
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Kafkafying – the transformation of a primitive monolithic program into a sophisticated scalable low-latency distributed streaming application (c.f. “An epidemic of a zombifying virus ravaged the country”)

Steps for Kafkafying Kongo

In the previous blog (“Kongo” Part 2: Exploring Apache Kafka application architecture: Event Types and Loose Coupling)  we made a few changes to the original application code in order to make Kongo more Kafka-ready. We added explicit event types and made the production and consuming of events loosely-coupled using the Guava EventBus. In this blog we build on these changes to get an initial version of Kongo running on Kafka.

Step 1: Serialise/deserialise the event types

What is serialization? In publishing serialization is when a larger book is published in smaller, sequential installments.  Serialized fiction surged in popularity during Britain’s Victorian era. A well known example is the Sherlock Holmes detective stories, which were originally serialized in The Strand magazine:

And deserialisation?  That’s what happens when you try and reconstitute the original complete stories.  Here’s all 79 Strand magazines with Sherlock Holmes stories:

The concept is similar in distributed systems, and is necessary due to differences in data types, programming languages, networking protocols, etc.  Data structures or Objects are serialized into bit streams, transmitted or stored somewhere else, and the process reversed and the bits deserialized into copies of the originals.

In the previous blog we created the explicit event types to prepare Kongo for Kafka:

Sensor, RFID Load, RFID Unload and Location check Events. To use these them with Kafka we need to enable them to be serialized and deserialized.

From our introductory Kafka blog (Exploring the Apache Kafka “Castle” Part A: Architecture and Semantics) recall that the important Kafka concepts are Producers, Consumers and Topics. In Kafka, a ProducerRecord object is created to write a record (a value) onto a Kafka topic from a Producer. ProducerRecord takes a topic and a value (and optionally timestamp, partition, and key).

To create a ProducerRecord you need to supply the key (optional) and value (required) serializers, and some other parameters. Here’s an example using the built in Kafka serializers for basic Java types, with the key and value as Strings:

// create and use a Kafka producer with key and value as String types

KafkaProducer<String, String> stringProducer;
Properties props = new Properties();        props.put("bootstrap.servers", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);

props.put("client.id", "KongoSimulator");

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

stringProducer = new KafkaProducer<>(props);

// write a string value to a topic

stringProducer.send(new ProducerRecord("topic", "event 1").get();

Here’s the list of built-in Kafka (de)serialization methods.

But what if you have more complex event types (like Kongo’s Sensor and RFID events) that you want to write and read from Kafka topics? Under the hood Kafka only writes and reads byte streams to/from topics, so Java objects can’t be directly written and read.  You have to write custom serializers.

To create custom serializers (such as for POJOs) the Serializer interface has to be implemented. We wrote a serializer class for each Kongo event type. Here’s the example for Sensor. Note that both the serializer and deserializer are in the same class for simplicity:

public class SensorSerializer implements Closeable, AutoCloseable, Serializer<Sensor>, Deserializer<Sensor> {

   public static final Charset CHARSET = Charset.forName("UTF-8");

   @Override
   public void configure(Map<String, ?> map, boolean b) {
   }
   
   @Override
   public byte[] serialize(String s, Sensor sensor) {
            String line =
    sensor.time + ", " +
    sensor.doc + ", " +
    sensor.tag + ", " +
    sensor.metric + ", " +
    sensor.value;
       return line.getBytes(CHARSET);
   }

   @Override
   public Sensor deserialize(String topic, byte[] bytes) {
       try {
           String[] parts = new String(bytes, CHARSET).split(", ");

           int i = 0;
           long time = Long.parseLong(parts[i++]);
           String doc = parts[i++];
           String tag = parts[i++];
           String metric = parts[i++];
           double value = Double.parseDouble(parts[i++]);

           return new Sensor(time, doc, tag, metric, value);
       }
       catch(Exception e) {
           throw new IllegalArgumentException("Error reading bytes", e);
       }
   }

   @Override
   public void close() {

   }
}

RFIDLoad and RFIDUnload Events also need similar code. This approach simply turns the java objects into Strings and then into a byte stream, and is the simplest approach I could think of. There are more sophisticated approaches.  This is a useful blog which looks at some alternative custom serializers, and is also based on a Sensor POJO example. It also has code examples. The blog “Kafka Serialization and the Schema Registry”
provides a more in-depth look at using the Apache Avro serializer and the Kafka Schema Registry which may give improved flexibility and type safety.

Luckily this is the first time I’ve had to think about writing explicit code for (de-)serialization for a (long) while. However, it’s similar to (un-)marshalling which was a significant contributor of performance overhead in other distributed systems technologies such as CORBA, enterprise Java, and XML-based web services.  It turns out that there are slower and faster Kafka (de-)serialization methods. If you want to read further, the following linked blog demonstrates that byte arrays are faster for reading and writing, and there’s also an example of using the default Java Serializable interface and Input/Output streams.  Once I’ve got Kongo up and running on a more production-like Kafka cluster I may revisit the question of which is the best approach, for functionally and performance.

After implementing Kafka Producers and Serializers, events can be written to Kafka topics. Different event types can be written to the same topic, or the same event types can be written to different topics. Now there is a choice to make about how to map the EventBus topics from the previous blog to Kafka topics. How many topics should there be?

Interlude

In order to add Kafka Producers and Consumers to the Kongo application, a new class called KafkaRun was written which acts as the top-level control for the simulation. It creates Kafka Consumers for each event type, and starts them running in their own thread(s). They are all single threaded at present but can have more if required. Then it starts the simulation running. The simulation code was modified to use the new Kafka Producers to sent events to different topics. Note that the default is for kafka topics to be created automatically which is what we assumed (the code doesn’t explicitly create any topics). During the simulation the Consumers read messages from topics and process them as before (i.e. checking sensor/goods rules, moving goods around, and checking co-location rules).

Step 2: One or Many Topics? Alternative 1: The more topics the merrier!

(Cornered, from gocomics.com)

The simplest way to map the previous loosely coupled EventBus Kongo design to Kafka was to blindly follow the same design, and replace the EventBus code with equivalent Kafka producers, topics and consumers. For Sensor events, we previously used (2.1 Sensor Events: One or many topics?) many EventBus (topics), one for each location, with each Goods object (dynamically) subscribed to the topic corresponding to its current location:

The Kafka version of this was also implemented with a Kafka topic for each location, and a Kafka Consumer per Goods object polling a Kafka location topic for sensor events.

Extra code was written to implement the Consumers to allow Goods objects to dynamically subscribe to the correct Kafka location topic, and run the sensor rules, We also needed RFID consumers to change the subscriptions when Goods were moved (in response to RFID Load and Unload events), and to check co-location rules. The simplest (and somewhat naive, as it turned out) implementation was to have a thread per consumer Goods object, and with each consumer object was in it’s own consumer group (to ensure that each Goods object got a copy of each sensor event per location).  The code for this approach is in SensorGoodsConsumer, with some logic and options in KafkaRun and Simulate to create multiple sensor location topics and subscribe Goods consumers.

This diagram shows this approach with new Kafka components in black (rectangles are topics, diamonds are consumers):

This approach (obviously) resulted in lots of topics (100s) and consumer threads/groups (1000s).  Note that there was only 1 partition per topic, but due to the number of topics this meant there were also 100s of partitions.  Did it work?

Functionally, yes. Performance-wise? Probably not, but it was hard to determine exactly where the bottlenecks were as everything is currently running on a single machine.  For example, it’s possible that the problem was on the Kongo application side due to the large number of Consumer threads (we could have used a worker thread pool perhaps). In the long run this design isn’t scalable anyway – i.e. with thousands of locations and millions of Goods!

Note that the original EventBus RFIDLoad and RFIDUnload topics were also replaced by two equivalent Kafka topics and consumers (RFIDLoadEventConsumer, RFIDUnloadEventConsumer). Refer to the step 3 event ordering section below for further discussion.

Another related design choice is around alternative Consumer models. How many consumers and how many threads are desirable? This article explores the pros and cons of different Kafka consumer and thread models, and has code examples.

Alternative 2: One Topic (to rule them all)

One Ring to rule them all, One Ring to find them,
One Ring to bring them all and in the darkness bind them

inscription on the One Ring, forged by Sauron

Given that many topics design didn’t work as expected, what’s the alternative? How many topics can you have? What’s the general advice?

It seems that there is no hard limit, but topics and partitions obviously consume resources. The standard advice is that rather than having lots of topics, it’s better to have a smaller number of topics (possibly only 1) and multiple partitions. This also guarantees event order, as there’s no guarantee of event order across topics or within a topic across partitions, only within each partition in a topic.

Taking inspiration from the Dark Lord Sauron we changed the design to have one Kafka topic for sensor events for all locations, with (currently) a single Kafka consumer subscribed to it. The consumer reads all the sensor events and puts each event on one of the original EventBus location topics, so that only the Goods at that location get the event and check their sensor rules as before.  I.e. we are basically wrapping the original EventBus code with Kafka. The diagram shows this approach:

This code is SensorConsumer (with some logic in KafkaRun and Simulate.java to create 1 topic and subscribe consumers), and it works better than the approach with a large number of Kafka topics.

Step 3: Matter event order does? Depends it does.

The version of Kongo with a one sensor event topic sometimes produced exceptions related to the location of Goods (luckily I had put in pre-conditions with warning messages in the code for moving the location of Goods which checked if Goods were really in the location that a RFIDLoad and RFIDUnload event assumed). This turned out to be due to the use of the two distinct kafka RFIDLoad and RFIDUnload topics which means that the load and unload consumers can get events out of order.  Event order actually matters for RFID events. I.e. a Goods must be loaded onto a truck before they can be unloaded. The obvious solution was to use a single kafka topic for both RFIDLoad and RFIDUnload events in order to guarantee event order. The code for this approach is RFIDEvent, RFIDEventSerializer, and RFIDEventConsumer (which determines which RFID event type is received and posts a load or unload event to the EventBus).

For event stream processing applications like Kongo, the state of the world depends on a stream of events in a particular order, so a general principle to keep in mind is that all events about the same “thing” may need to go in the same Kafka topic to guarantee ordering.  Kafka streams are likely to be relevant for this so we’ll look at them in a future blog (as they provide state, windows etc).

Is this finally a correct and scalable design for a distributed Kafka version of Kongo? The current design has two Kafka topics, one dedicated to sensor events and one for RFID events. This gives correct event ordering within each event type but not across them. Could this be a problem? What happens if Goods are moved from a location before the sensor events can be checked for them at the location? Sensor rule violations could be missed.  Part of the solution to these sorts of problems is to ensure that the consumers for each topic can keep up with the throughput so that they don’t get behind. The consumers can keep track of their own progress relative to the latest event (i.e. current offset – last read offset, see SensorConsumer), and spin up/down more instances until the backlog is processed. Explicit use of timestamps for processing (in conjunction with Kafka streams) and more explicit use of topic keys and partitions are also options which we will explore in future blogs.

Another approach is suggested by the fact that we detected event ordering problems as a result of having explicit preconditions in the Kafka consumer code for RFID events. If event ordering preconditions are not satisfied for an event it may be possible to postpone processing and retrying later on. Because Kafka consumers keep track of their own topic offsets a simple approach is for consumers to remember which events have problems, and retry them periodically, or even just write them back into the same topic..

Read Full Article
Visit website
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

In the previous article, we gained an understanding of the main Kafka components and how Apache Kafka consumers work. Now, we’ll see how these contribute to the ability of Kafka to provide extreme scalability for streaming write and read workloads.

The post Developing a Deeper Understanding of Apache Kafka Architecture Part 2: Write and Read Scalability appeared first on Instaclustr.

Read Full Article
Visit website
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

The Apache Kafka distributed streaming platform features an architecture that – ironically, given the name – provides application messaging that is markedly clearer and less Kafkaesque when compared with alternatives. In this article, we’ll take a detailed look at how Kafka’s architecture accomplishes this.

The post Developing a Deeper Understanding of Apache Kafka Architecture appeared first on Instaclustr.

Read Full Article
Visit website
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Open source software is tricky business. One might think a volunteer project that gives you free software is the greatest thing ever, however making such a project work is complex. Creating open source software is quite simple for a small project with only a few hobbyist maintainers, where making decisions comes down to only one person or a very small group, and if users don’t like it they can fork the project and continue on their way without many hurdles. Things are not so simple for large projects with multiple stakeholders, where priorities are frequently conflicting but the health of the project still relies on all contributors behaving as, well, a community. This is what I’ll be writing about, and more specifically the do’s and don’ts when contributing to large open source projects.

But first, let’s talk about the kind of community that makes up an (large) open source project.

Four main types of contributors for an open source project
  1. The full timer who usually works for a company which utilizes/backs the project. This person is employed by the company to work on the project, usually directed by the company to work on specific bugs and features that are affecting them, and also on larger feature sets. They often work in a team within their company who are also working on the project. In some cases, these full-timers are not dedicated to writing code but more dedicated to the managerial side of the project. Part-timers similar to these also exist.
  2. The part-timer who has a vested interest in the project. Mostly these are consultants, but could still be from companies who use the software but don’t have enough resources to contribute full-time. Generally, they contribute to the project because it heavily influences their day jobs, and they see users with a certain need. Usually have a very good understanding of the project and will also contribute major features/fixes as well as smaller improvements. They may also just be very well versed users who contribute to discussions, helping other users, and documenting the software.
  3. The part-timer who has some interaction with the software during their day job, but is not dedicated to working on the software. These people often contribute patches related to specific issues they encounter while working with the software. Typically these people are sysadmins or developers. I’d sum these up as “the people that encounter something that annoys them and fix it”.
  4. The users. No point having all this software if there is no one to use it. Users contribute on mailing lists and ancient IRC’s, helping other users get on board with the software. They also give important feedback to the developers on improvements, bug fixes, and documentation, as well as testing and reporting bugs they find. Typically in a large project, they don’t drive features significantly, but it can happen.

There are many other types of contributors to a project, but these (to me) seem to be the main ones for large projects such as Apache Cassandra. You’ll note there is no mention of the hobbyist. While they do exist, in such large projects they only usually come about through extraneous circumstances. It’s quite hard to work on such a large project on the side, as it generally requires a lot of background knowledge, which you can only really grasp if you’ve spent countless hours working with the software already. It is possible to pick up a very small task and complete it without much knowledge about the project as a whole, but these are rare, which results in less hobbyists working on the project.

It’s worth noting that all of these contributors are essentially volunteers. They may be employed full time to work on the project, but not by the project. The company employing them volunteers their employees to work on the project.

Now there are a few important things to consider about a large project with a contributor-base like the above. For starters, priorities. Every contributor will come to the project with their own set of priorities. These may come from the company they work for, or may be itches they want to scratch, but generally, they will be directed to work on certain bugs/features and these bugs/features will not always coincide with other contributors priorities. This is where managing of the project gets complicated. The project has a bunch of volunteers, and these need to be organized in a way that will produce stable, functioning software that meets the needs of the user base, at least in a somewhat timely fashion. The project needs to be kept healthy and needs to continue satisfying the users needs if it is to survive. However, the user’s needs and the needs of the people writing the code often don’t intersect, and they don’t always see eye to eye. On a project run by volunteers this is important to consider when you’re asking for something, because although you may have a valid argument, there might not be someone who wants to make the contribution, and even if there is, they might not have a chance to work on it for a long time/ever.

Do’s
  1. Take responsibility for your contributions. I’ve noted it’s a common opinion that developers are only beholden to their employer, but this is not true. If you wrote code in an open source project, you’re still responsible for the quality and performance of that code. Your code affects other people, and when it gets into an open source project you have no idea what that code could be used for. Just because you’re not liable doesn’t mean you shouldn’t do a good job.
  2. Be polite and respectful in all discussions. This is very important if you want to get someone to help you in the project. Being rude or arrogant will immediately get people off-side and you’ll have a very hard time making meaningful contributions.
  3. Be patient. Remember OSS is generally volunteer-based, and those volunteers typically have priorities of their own, and may not be able to be prompt on your issues. They’ll get to it eventually, nudge them every now and again, just don’t do it all the time. I recommend picking up a number of things to do that you can occupy yourself with while you wait.
  4. Contribute in any way you can. Every contribution is important. Helping people on the mailing list/public forums, writing documentation, testing, reporting bugs, verifying behavior, writing code, contributing to discussions are all great ways to contribute. You don’t have to do all of them, and a little bit of help goes a long way. This will help keep Open Source Software alive, and we all want free software don’t we?
Don’ts
  1. Don’t assume that just because you have an idea that other people will think it’s good. Following that, don’t assume that even if it is good, someone else will be willing to implement it.
  2. Don’t assume that OSS is competing with any other software. If something better comes along (subject to licensing), it would make sense for the effort to be directed towards the new software. The only thing keeping the project alive is that people are using it. If it stops being relevant, it will stop being supported.
  3. Don’t expect other volunteers to work for you. If you have a great idea you must still be prepared to wait and get involved yourself to get it implemented. The nature of large OSS projects is that there is always more ideas than there is people to implement them, and the contributors are more likely to prioritize their own ideas over yours. If you can do some of the legwork to getting your ideas in place (proof of concepts, design documents, validation, etc) it will go a long way to making your idea a reality.
  4. Don’t expect to show up and be listened to. It takes years of working with a large project before you have enough knowledge (and wisdom) to make significant improvements. If you just show up and throw your ideas about like they are better than sliced bread you’ll likely put existing contributors on edge. Start small and incrementally build yourself a reputation of which people will give your ideas the consideration it deserves.
  5. Don’t waste people’s time. It may seem harsh but things like not having enough details to diagnose problems or when reporting bugs are huge time wasters and generally lead to your problems getting lost in the backlog. Make sure you always search the backlog for existing related issues and make sure you are prepared to provide all relevant information for the maximum chance of your request being implemented.

Hopefully, this gives a good overview of the kind of community that makes up an open source project and gives you a good idea of what you’re dealing with when you’re looking to contribute to <insert favorite OSS software here>. If you follow these simple do’s and don’ts you’ll have the best chances of success when making contributions. Don’t hold off, contribute today!

The post Contributing to Open Source Software appeared first on Instaclustr.

Read Full Article
Visit website
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

This is the second post in our series exploring designing and developing and example IOT application with Apache Kafka to illustrate typical design and implementation considerations and patterns. In the previous blog, we introduced our Instaclustr “Kongo” IoT Logistics Streaming Demo Application. The code for Version 1 of the Kongo application was designed as an initial stand-alone tightly-coupled prototype to demonstrate the functional behaviour. It won’t cope with any failures and won’t scale beyond a single server. In this post, our goal is to re-engineer the Kongo application to introduce Apache Kafka so that it is more reliable and scalable.

In order to get Kongo ready for Kafka we decided to make an intermediate design change to make it more loosely coupled. This involved (1) adding explicit event types, and (2) adding an Event Bus to introduce topics, and allow objects to publish, subscribe and consume events to the topics. See Version 2 of the Kongo application for the code details.

1 Event types

Kongo has two main types of events: sensor events and RFID events. Sensor events are produced by warehouses and trucks and represent measurements of metrics at a particular time and place. RFID events are produced by RFID readers when goods are moved through a warehouse dock to and from trucks and are either UNLOAD or LOAD events. We added classes for RFIDLoadEvent and RIFDUnloadEvent which are initially just containers for the data including <time, warehouseKey, goodsKey, truckKey>. Both classes have the same fields but there is a difference in semantics (i.e. the truck is either loaded or unloaded).

We also added a Sensor event class with <time, warehouse or truck location, metric, value> fields. The simulation loop (Simulate.java) was changed so that explicit RFID Load and Unload and Sensor event objects are now created. But what should we do with them?

2 Loose-coupling

An earlier attempt at a loosely-coupled communication system

To get the application ready for Kafka we need to make it loosely coupled in a way which will hopefully make it easier to be distributed across multiple processes and servers.  As an initial step towards this, we decided to introduce a publish-subscribe eventing pattern. Typically this means creating 1 or more event queues, defining what will happen when each type of event is received, setting up subscriptions to topics, and sending events to the correct event queues. We used the Google Guava EventBus to implement this. EventBus allows publish-subscribe-style communication between components (but isn’t a general pub-sub inter-process mechanism).

2.1 Sensor Events: One or many topics?

We tackled the sensor events first. The initial design was for a single sensor topic with all the sensor events (from warehouses and trucks) posted to it. E.g. in the simulate loop:

Sensor sensor = new Sensor(time, "SENSOR WAREHOUSE", warehouseKey, "temp", value);

sensorTopic.post(sensor);

In the original prototype code, there is an inefficient and tightly coupled call to a method to find all the Goods at the location of the sensor and check all the rules. Introducing loose coupling means that the sensor event producers don’t need to have any knowledge of what happens to the events anymore. But something does!

The initial loosely-coupled design registered all Goods objects with the topic when the Goods were first created. I.e.

EventBus sensorTopic = new EventBus(“sensorTopic”);

for (Goods goods: allGoods.values())

topic.register(goods);

The EventBus register method registers listener methods on the object passed in the argument. Listener methods must have a @Subscribe annotation and are specific to event types that will be posted. For example, the listener method for the Goods objects looks like this:

// subscribe Goods object listener to Sensor events

@Subscribe
public void sensorEvent(Sensor sensor)
{

System.out.println("GOT SENSOR EVENT! Object=" +  tag + ", event=" + sensor.toStr());
if (goodsInLocation(event.tag)

String v = violatedSensorCatRules(sensor);
// etc
}

This worked, but had the obvious problem that every sensor event was sent to every Goods object, which then had to check if it could ignore the event or not (based on location).  Here’s a diagram showing this “many-to-many” pattern:


This won’t scale to millions of Goods objects. One of the expected benefits of pub-sub is to enable consumers to subscribe to only a subset of events that are relevant to them. A simple improvement was therefore to have multiple EventBuses (topics), one topic per location, and then subscribe Goods to only the topic that they are located at. Here’s the code for publishing sensor events to the correct topic. For the full code see Github.

// Find the topic corresponding to the warehouse location

// and post the sensor event to only that topic

EventBus topic = topics.get(warehousekey);

Sensor sensor = new Sensor(time, "SENSOR WAREHOUSE", warehousekey, "temp", value);
topic.post(sensor);

This refinement worked well, and is highly scalable, even with increasing numbers of topics (e.g. 1000 warehouses + 2000 trucks = 3000 topics) and Goods objects (millions).  Here’s the diagram for this pattern:

In theory there is a further refinement based on the observation that not all Goods objects care about every sensor metric (although eventually the code might not really be loosely coupled anymore). It may therefore be possible to further refine the subscriptions by metric type.

This isn’t yet a complete solution as Goods objects move around, and we need to run the co-location rules to check ifGoods of different categories are permitted to travel in the same truck. As they change locations, the subscriptions must be dynamically updated. We haven’t implemented RFID event topics and handlers yet, and as this is where theGoods movement events come from, this is obviously the next step.

2.2 RFID Events

The design for RFID events was to have an EventBus/topic for each of load and unload events, with the handlers defined in the RFIDLoadEvent and RFIDUnloadEvent classes (in hindsight 2 topics weren’t really necessary given that we had different RFID event types):

rfidLoadTopic = new EventBus("load");
rfidUnloadTopic = new EventBus("unload");
RFIDLoadEvent loadHandler = new RFIDLoadEvent();
rfidLoadTopic.register(loadHandler);
RFIDUnloadEvent unloadHandler = new RFIDUnloadEvent();
rfidUnloadTopic.register(unloadHandler);

What should the handlers do for each event type? For RIFDUnloadEvent the logic is as follows:

RFIDUnloadEvent: Move theGoods object from a truck to a warehouse

  1. Get the key of theGoods to unload from the event.
  2. Find the goods object given theGoods key.
  3. Find the location topic that theGoods key is currently registered with (truck).
  4. Unregister the goods from that topic.
  5. Find the location topic of the warehouse.
  6. Register the goods object with that topic.

What this sequence of steps achieves is that theGoods object will stop receiving sensor events from the truck location, and start receiving sensor events at the warehouse location.

The RFIDLoadEvent handler is slightly more complex as it must also deal with co-locatedGoods rules checking:

RFIDLoadEvent: Move theGoods object from a warehouse to a truck, and check for co-location rules violations

  1. Get the key of theGoods to load from the event.
  2. Find the goods object given theGoods key.
  3. Find the location topic that theGoods key is currently registered with (warehouse).
  4. Unregister the goods from that topic.
  5. Find the location topic of the truck.
  6. Create colocatedCheckEvent <time, goods, truck>.
  7. Post this event to the truck location topic.
  8. Register the goods object with truck location topic.

Steps 1-6 are similar to the step for the Unload events, but the remaining 2 steps are critical and are used to construct a new event type (colocatedCheckEvent) and post it to the truck location. This results in all the Goods objects that have already been loaded onto the truck checking their rules to see if they are happy with the new Goods object to be loaded or not.  

The Goods class has a new method handler, colocatedRulesEvent(ColocatedCheckEvent event), which gets theGoods object to be loaded, and checks the co-location rules between “this” object and the object to be loaded. Note that we are now sending two event types to sensor topics, and each type has a different handler.

I initially (accidently) tried a simpler solution which just re-used the sensor location topics and added the RFID event handlers to theGoods objects. This also worked, but required each object to check if it was the intended recipient of the load/unload event. In practice it is inefficient as everyGoods object except the one being moved would unnecessarily receive the event and then just ignore it.

What’s still missing? We probably need to do something with the rules violations (sensor and co-location). The obvious thing would be to have a violation topic and publish the violation events in that.

Did these design changes make it easier to migrate the Kongo application to Kafka? Find out next blog.

A grammatical note: Goods is both plural and singular!

That makes sense as using Good for a singular Goods (which I have caught myself doing) could cause confusion, being an ethical judgement, in contrast to “Bad”).

https://www.wordhippo.com/what-is/the-singular-of/goods.html

The post “Kongo” Part 2: Exploring Apache Kafka application architecture: Event Types and Loose Coupling appeared first on Instaclustr.

Read Full Article
Visit website
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Financial services industries such as banking, insurance and capital management have been built on data since they first began. From ancient money lenders with clay tablets to early 20th century clerks with rooms full of ledgers to modern high-frequency trading, efficiently recording, retrieving and analysing data has been a key competitive advantage for successful financial services organisations.

Introduction

In the modern age, the pressures to gain a technological edge in data processing are coming not only from the competition but also from consumers. Gaining a competitive edge requires systems that can collect and quickly analyse vast streams of data. Consumers expect that the systems they interact with will be instantly up to date, always available and, increasingly, be aware of the context of all their previous interactions and related information.

Addressing these joint pressures, while containing technology costs, requires the adoption of new generation of architectural patterns and technologies.

Cloud-based, open source solutions are increasingly being used to help financial services organisations meet these demands by breaking  the constraints of legacy systems such as:

  • providing true always-on services;
  • serving high transaction (particularly read) volume uses cases at a reasonable cost;
  • extreme scalability and latency requirements; and
  • enabling analytics and AI-driven innovation.

At Instaclustr, we are specialists in three open source technologies that are commonly used in these scenarios: Apache Cassandra, Apache Spark and Apache Kafka. This paper examines the advantages and typical use of these technologies in financial services architectures.

Benefits of Apache Cassandra

Apache Cassandra is a horizontally scalable, highly available open source database system. Its design, when properly implemented,  allows unlimited scalability in terms of volumes of data stored and operations per second served. Cassandra’s masterless architecture and native support for replication within and across data centers allows organisations to achieve the highest levels of availability while minimise infrastructure and management costs. These characteristics make Cassandra ideally suited as a data store for modern financial applications that must deal with vast streams of data and while being always on.

Some architects and developers that are familiar with Cassandra may have heard that it is an “eventually consistent” database (meaning updates may take some time to be applied to all nodes in a cluster) and be concerned if this is suitable in a financial services setting. However, it is more correct to describe Cassandra as “tuneably consistent”. That is Cassandra allows you optimise the level of investment in redundant infrastructure, availability in the face of failures and consistency guarantees to reach the solution that is the best fit for your use case. Many, if not most, Cassandra applications run with guaranteed consistency.

That said, financial services process such as banking are in fact the original model of eventual consistency. Consider a transaction which is made at a bank branch in one country and then eventually finds its way to the ledger of another bank overseas after a series of batch exchanges. So, financial services solution architects may be well placed to understand the trade-offs of eventual consistency and make good use of this feature in their solutions.

Benefits of Apache Spark

Apache Spark is a modern, in-memory, distributed analytics engine with an architecture informed by big data platforms such as Hadoop MapReduce but designed to build on the experience of those platforms and overcome their limitations. Apache Spark is currently the most active open source project in the big data world2. The Apache Spark engine is 100 times faster than MapReduce in memory and ten times faster from disk. Spark supports a broad range of analytics capabilities ranging from SQL queries over big data to graph analytics and machine learning. Spark can be operated both in both batch processing and streaming modes and it supports multiple languages for analytics job development.

The Spark analytic engine can be deployed both as an interactive tool for use by data analysts (likely with the assistance of a notebook style UI such as Apache Zeppelin) or embedded into an application architecture for automated analytic task such as fraud detection.

Benefits of Apache Kafka

Apache Kafka is a queuing and streaming platform based on similar architectural patterns to Cassandra and Spark to again provide levels of scalability and availability that cannot be achieved with traditional monolithic architectures.

Kafka can be used in your architecture:

  • as a message bus – provide loose coupling between producers and consumers of messages;
  • as a store of logical transactions for populating a analytical data stores or edge caches;
  • as a buffer to manage back pressure is systems subject to workload spikes; and
  • (along with Kafka Streams or Spark Streaming) as the basis of a streaming architecture for real-time analytics.
Conclusion

Together or separately, these leading open source products can all help financial services companies to meet the growing data and analysis demands of their business while containing and even reducing costs. Key examples of areas where we see the open source technologies deployed in financial services include:

  • data caches supporting consumer banking apps and open banking APIs;
  • consumer insight applications;
  • fraud detection analytics; and
  • core architecture components for fintech companies (transaction database, enterprise message bus, etc).

Instaclustr, through our Managed Service, Enterprise Support and Consulting services has helped many companies in financial services and other industries achieve these benefits of open source.

The post Open Source Use Cases In Financial Services appeared first on Instaclustr.

Read Full Article
Visit website
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Instaclustr is excited to announce our managed service support for Elassandra 5.5, updating our release of Elassandra 2.5 last year.

The new release includes:

  • Kibana 5.5
  • Elassandra 5.5.0.13 (based on Cassandra 3.11.2 and Elasticsearch 5.5)

Elassandra gives you the full power of Elasticsearch to search your Cassandra data – without having to build custom integration or synchronisation code.  

Elassandra provides:

  • The ability to use Elasticsearch functionality to retrieve data stored in Cassandra through CQL using an Elasticsearch-based secondary index.
  • The ability to retrieve data stored in Cassandra through standard Elasticsearch REST APIs.
  • Kibana – a powerful user interface to explore, analyse and visualise your data via search.
  • All of this, provisioned within minutes and monitored, managed and supported by Instaclustr’s renowned managed service capability.
  • What’s new in Elassandra 5.5:
    • Improved Elasticsearch indexing and search performance
    • Elasticsearch Painless Scripting
    • New Elasticsearch data structures
    • Improved Elasticsearch Search API and Aggregations
    • Java REST client
    • Compatible with Spark 2.1.1 and Zeppelin 0.7.1
    • Compatible with Elasticsearch 5.x API

    If you would like further information please contact info@instaclustr.com.

The post Instaclustr Releases Elassandra 5.5 appeared first on Instaclustr.

Read Full Article
Visit website

Read for later

Articles marked as Favorite are saved for later viewing.
close
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Separate tags by commas
To access this feature, please upgrade your account.
Start your free month
Free Preview