Overview
Stream processing is about processing data as it arrives. Unlike โbatch
processingโ businesses donโt have to wait a certain amount of time
(usually in hours to a day based on the volume of the data) to store,
analyze and get results on the incoming data. This is particularly
useful in cases like fraud detection, system monitoring, stock market
analysis, website activity tracking and many more. Early detection is
key for all these cases hence stream processing is a perfect fit here.
Apache Kafka, a distributed streaming
platform, has served as a reliable source of data for stream processing
solutions like Apache Spark Streaming, Apache Storm, Apache Samza,
Apache Flink. Starting with version 0.10.0, Kafka client libraries also
include a powerful stream processing library. Which means there is no
need for an external processing framework just to process incoming data
which is already available in Kafka.
Here we will see how to implement fraud detection using Apache Kafka
streams API. ATM transactions, as they happen, can be streamed to a
Kafka Topic. These can then be processed to detect any suspicious
activity.
Prerequisites
Before continuing, you should be aware of the following concepts, core
to stream processing:
- Stream โ Table Duality
- Time windows in stream processing
- Stream Repartitioning
- Stream-Stream, Stream-Table Join
- Topology
If you are not aware of these concepts, you can read a free book
by confluent.io Kafka The Definitive
Guide.
Approach
We will consider a transaction to be suspicious, if
- Two or more transactions have happened on the same account within 10
mins.
- They are done on different ATMs
- The ATMs are located too far apart and cannot be reached in 10 mins
or less.
A typical ATM transaction data would contain the following data:
- Account no
- Atm Id
- Time of transaction
- Amount
- Transaction Id.
The first two criteria above should be fairly easy to check with this
data. For the last criteria, we will calculate the required speed to
reach next atm. If the speed seems to be too great, we will consider it
as a suspicious activity.
To calculate the speed between ATMs, we also need geo locations
(co-ordinates) of the ATMs. We can use them & find the distance between
the two ATMs โas the crow fliesโ. The distance & timestamps on the
transactions can give us the required speed.
Sample Data
Here is a sample of the data we expect to receive on our topic. It
should include the geo location of the ATM in addition to the data
mentioned above.
{
"account_id": "a54",
"timestamp": "2018-12-13 11:23:07 +0000",
"atm": "ATM : 301736434",
"amount": 50,
"location": {"lat": "53.7916054", "lon": "-1.7471223"},
"transaction_id": "77620dac-fec9-11e8-9027-0242ac1c0007"
}
Implementation
Create Project
First, we need to create a project for our application. I am using Maven
as my build tool. So, I created a pom.xml file inside my project
directory & added kafka-streams
as a dependency.
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
You can find the entire project
on github.
Main Class
ATMFraud.java contains the main method for the application. It starts
off by creating a list of property entries needed for connecting to
message broker.
Properties properties = new Properties();
properties.put(APPLICATION_ID_CONFIG, "streams-atm-fraud-detector");
properties.put(BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
properties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, String().getClass());
properties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, AtmTransactionSerde().getClass());
Note that, I am running Kafka separately on my machine. The application
code here does not start/stop Kafka with it. That is something I plan to
add later. As of now, it is not included.
StreamsBuilder
class lets us connect to a Kafka Topic & read its
contents as a Stream. Below we connect to the topic
my_atm_txns_gess
as a stream & filter any null messages. We then
re-partition the stream to have a useful key i.e. Account id.
final StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<String, AtmTransaction> stream1 = streamsBuilder
.stream("my_atm_txns_gess", Consumed.with(String(),
AtmTransactionSerde(),
new ATMTransactionTimestampExtractor(),
Topology.AutoOffsetReset.EARLIEST))
.filter(((key, value) -> value != null))
.map((key, value) -> KeyValue.pair(value.getAccountId(), value));
The stream
method above accepts
- Name of the topic to connect to.
- Key & Value Serdes (A short name by Kafka for serializer &
deserializer pair)
- A
TimestampExtractor
implementor class object; required for correct
windowing of the messages.
- And an
AutoOffsetReset
which lets you choose where to start
reading messages from i.e., from the beginning or end.
AtmTransaction
is the class representing data in the
Topic. AtmTransactionSerde
is serde for this class. To find two
transactions on the same account, we join the stream with a copy of
itself. We use map
method to create a copy of the stream.
final KStream<String, AtmTransaction> stream2 = stream1.map((KeyValue::pair));
Now we can join these two streams to get a JoinedAtmTransactions
. The
join should consider:
- all transactions after 10 mins of the current transaction in the
first stream
- no transaction before the timestamp on current transaction.
This will ensure that we join the later transaction with earlier one. We
ignore the join other way round.
final KStream<String, JoinedAtmTransactions> joinedStream = stream1.join(stream2,
JoinedAtmTransactions::new,
JoinWindows.of(Duration.ofMinutes(10)).before(Duration.ZERO));
Joined Atm Transactions
I structured the JoinedAtmTransactions
as below. The
method calculateDistanceInKM
calculates the distance between the
given co-ordinates.
public class JoinedAtmTransactions {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss Z")
@JsonProperty("prev_timestamp")
private Date prevTimestamp;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss Z")
@JsonProperty("later_timestamp")
private Date laterTimestamp;
@JsonProperty("distance_between_trxns_KM")
private double distanceBetweenTrxnsKM;
@JsonProperty("millisecond_difference")
private long millisecondDifference;
@JsonProperty("minutes_difference")
private double minutesDifference;
@JsonProperty("KMH_required")
private double KMHRequired;
@JsonProperty("account_id")
private String accountId;
@JsonProperty("prev_transaction_id")
private UUID prevTransactionId;
@JsonProperty("later_transaction_id")
private UUID laterTransactionId;
@JsonProperty("prev_transaction_location")
private Location prevTransactionLocation;
@JsonProperty("later_transaction_location")
private Location laterTransactionLocation;
public JoinedAtmTransactions(final AtmTransaction prevTrxn, final AtmTransaction laterTrxn) {
prevTimestamp = prevTrxn.getTimestamp();
laterTimestamp = laterTrxn.getTimestamp();
distanceBetweenTrxnsKM = calculateDistanceInKM(prevTrxn.getLocation().getLat(),
prevTrxn.getLocation().getLon(),
laterTrxn.getLocation().getLat(),
laterTrxn.getLocation().getLon());
millisecondDifference = laterTrxn.getTimestamp().getTime() - prevTrxn.getTimestamp().getTime();
minutesDifference = (double) millisecondDifference / (1000 * 60);
KMHRequired = distanceBetweenTrxnsKM / (minutesDifference/60);
accountId = prevTrxn.getAccountId();
prevTransactionId = prevTrxn.getTransactionId();
laterTransactionId = laterTrxn.getTransactionId();
prevTransactionLocation = prevTrxn.getLocation();
laterTransactionLocation = laterTrxn.getLocation();
}
private double calculateDistanceInKM(double lat1, double lon1, double lat2, double lon2) {
double d2r = (PI/180);
double distance = 0;
try {
final double dlat = (lat2 - lat1) * d2r;
final double dlong = (lon2 - lon1) * d2r;
final double a = pow(sin(dlat / 2), 2) + cos(lat1 * d2r) * cos(lat2 * d2r) * pow(sin(dlong / 2.0), 2);
final double c = 2 * atan2(sqrt(a), sqrt(1 - a));
distance = 6367 * c;
} catch (Exception e) {
e.printStackTrace();
}
return distance;
}
...
}
Filter the join results
We are now ready to filter the joinedStream
to find suspicious
transactions. We will use the criteria mentioned in the approach
section.
public class ATMFraud {
...
final KStream<String, JoinedAtmTransactions> filteredStream = joinedStream.filter(((accountId, joinedTrxn) ->
{
boolean result = false;
if (joinedTrxn.getPrevTransactionId().equals(joinedTrxn.getLaterTransactionId()))
System.out.println("transaction IDS match. record will be skipped. Prev Trxn Id: " + joinedTrxn.getPrevTransactionId()
+ "Later Trxn Id: " + joinedTrxn.getLaterTransactionId());
else if (joinedTrxn.getPrevTimestamp().equals(joinedTrxn.getLaterTimestamp()))
System.out.println("transaction TIMES match. record will be skipped. Prev Trxn Id: " + joinedTrxn.getPrevTransactionId()
+ "Later Trxn Id: " + joinedTrxn.getLaterTransactionId());
else if (joinedTrxn.getPrevTransactionLocation().toString().equals(joinedTrxn.getLaterTransactionLocation().toString()))
System.out.println("transaction LOCATIONS match. record will be skipped. Prev Trxn Id: " + joinedTrxn.getPrevTransactionId()
+ "Later Trxn Id: " + joinedTrxn.getLaterTransactionId());
else {
System.out.println("FRAUDULOUS transaction found. Prev Trxn Id: " + joinedTrxn.getPrevTransactionId()
+ "Later Trxn Id: " + joinedTrxn.getLaterTransactionId());
result = true;
}
return result;
}));
...
}
Entries in the filtered stream are suspicious transactions. We will post
them to my_atm_txns_fraudulent
stream. Some other app can listen to
this topic & take needed actions. For demo purpose, I will print these
transactions before posting to the my_atm_txns_fraudulent
stream. I
can do that using mapValues
operation, as below.
final KStream<String, JoinedAtmTransactions> printFraudTrxnsStream = filteredStream.mapValues(value -> {
System.out.println("--------------------------------------------------------------------------------------------");
System.out.println("Fraudulent transaction: " + value);
return value;
});
printFraudTrxnsStream.to("my_atm_txns_fraudulent", Produced.with(String(), JoinedAtmTransactionsSerde()));
Connect and start listening to messages
This last piece of code is very important. This will enable us to start
listening to the input topic. We first build the topology of the streams
& print the same to the console. Then we use the topology & properties
to create an instance of KafkaStreams
. The shutdown hook lets us close
the streams before exiting the application. Lastly, we start
the kafkaStreams
& wait till the program is asked to stop.
final Topology topology = streamsBuilder.build();
System.out.println(topology.describe());
final KafkaStreams kafkaStreams = new KafkaStreams(topology, properties);
final CountDownLatch latch = new CountDownLatch(1);
getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
kafkaStreams.close();
latch.countDown();
}
});
try {
kafkaStreams.start();
latch.await();
} catch (Throwable t) {
System.exit(1);
}
System.exit(0);
Result
The application can be run using mvn clean package exec:java
-Dexec.mainClass=myapps.ATMFraud
command. Image below shows the output
that you should get when Kafka is posted with ATM transaction as shown.
The console on the top is running Kafka console consumer, listening to
the source topic i.e. my_atm_txns_gess
. The console on the bottom is
doing the same, listening to the result topic i.e.
my_atm_txns_fraudulent
.
There are 4 transactions posted to the source topic out of which the
last two are on the same account & fit the suspicious activity criteria.
As you can see in the bottom console, our application correctly
identified the suspicious activity & posted on the result topic.
References
On an unrelated note, if you want to learn more about auditing
Kubernetes clusters check out this blog
post.
Looking for help with cloud native security consulting & implementation? do check out our capabilities how weโre helping startups & enterprises as an cloud native consulting & services provider.