Automate Everything - Automate NSQ Statistic reporting with Haskell
Several months ago, I was given a task to make a report which consist statistic of several NSQ topics. This job is not really hard, we just need to check NSQ statistic every hour and write it down into shared excel file. After several times checking manually, I thought that I can automate it easily. So, I made a script to automate most of if using Go. You can find my code right here. The code is just work, I haven't check it again after I made it, so forgive me if it looks bad.
Well, the title is about haskell, so let's get into it. After I made that golang project, I thought that maybe I can try to continue learning about Haskell by making several small project like nsq-message-checker-go. Here we go, this is how I made the Haskell version of nsq-message-checker-go. You can find the the completed code in gitlab.
main :: IO ()
main = echo "Let's code some Haskell"
What we need
- Stack. I don't know any other way to code haskell without stack, so bear with it.
- Intero. This is the best interactive development for haskell, you can complaint to the one that influence me if you have different opinion.
- Text Editor which can use Intero painlessly. I use VS Code with Haskero plugin, but you can also use Vim or Emacs. Emacs works out of the box but if you can integrate Vim/NeoVim with Intero, I will really appreciate if you tell me how to do that.
Start Coding
Project Setup
Let's create a new project using stack.
❯ stack new nsq-message-checker
After the project successfully created, we should try to build the template project.
# Do not forget to run `stack setup` first
❯ stack setup
❯ stack build
# output
...
Registering library for nsq-message-checker-0.1.0.0..
If you see that kind of line in the last output, it means that the build is succeed.
Intero Setup
We should try whether intero can work perfectly. Intero will automatically downloaded and installed if we use emacs with haskell layer, but because we are using VS Code, we should build it first using stack.
❯ stack build intero
# output
haskeline-0.7.4.2: using precompiled package
intero-0.1.31: download
intero-0.1.31: configure
intero-0.1.31: build
intero-0.1.31: copy/register
Completed 2 action(s).
After we reload the VS Code, haskero will automatically initialized and run intero in background.
Misc. Files
I also add/update several files manually before I start coding. You can see these files on the repository. These files are:
- .editorconfig
- .gitignore
- .env.example
- Makefile
I use .env file for the configuration. That's why I also use dotenv library to parse it.
Command Line Argument
My idea is to read the topics from command line. So, to make a report for X topic we can use this kind of command:
❯ stack exec nsq-message-checker -- topic X
And we can also make report for all topic at once like this:
❯ stack exec nsq-message-checker -- topic all
To use that kind of style, we will use turtle library and put the functions in Main.hs:
-- Main.hs
{-# LANGUAGE OverloadedStrings #-}
module Main where
import NSQ
import Protolude
import Turtle
-- dotenv
import qualified Configuration.Dotenv as Dotenv
import Configuration.Dotenv.Types
-- main parser
parser :: Parser (IO ())
parser = parseMainCommand
<|> parseProcessNsqTopic
-- main command
parseMainCommand :: Parser (IO ())
parseMainCommand = pure mainCommand
mainCommand :: IO ()
mainCommand = echo "Use \"-h\" argument for help"
-- topic subcommand
-- stack exec nsq-message-checker -- topic all
-- stack exec nsq-message-checker -- topic optimization_rule
parseProcessNsqTopic :: Parser (IO ())
parseProcessNsqTopic =
fmap
processNsq
(subcommand
"topic"
"Process specified topic"
(argText "topic_name" "What topic should be processed? (all for all topics)"))
-- main
main :: IO ()
main = do
_ <- Dotenv.loadFile defaultConfig
cmd <- options "NSQ Statistic Checker" parser
cmd
As you can see, the main function is processNsq which accepts topic as topic name.
Code Structure
Honestly, I have not found any best practice example to manage this code, but my mentor said that I can just put the code in src folder directly as NSQ.hs. I also separate type related code in Types.hs.
❯ tree
.
├── src
│ ├── NSQ.hs
│ └── Types.hs
...
Types.hs
After analyzing the response from NSQ statistic and the output that I expected, I made 5 data type and put it in Types.hs. For the record, the output will be written into csv file, that's why I use cassava library in this project.
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TemplateHaskell #-}
module Types where
import Data.Aeson
import Data.Aeson.Casing (aesonPrefix, snakeCase)
import Data.Text as T
import Protolude
-- cassava
import Data.Csv (FromRecord, ToNamedRecord, ToRecord)
import qualified Data.Csv as Cassava
-- | Client data type.
data Client = Client
{ clientClientId :: Text -- ^ Client ID.
, clientHostname :: Text -- ^ Hostname of the client.
, clientVersion :: Text -- ^ Client version.
, clientRemoteAddress :: Text -- ^ Remote address of client.
, clientState :: Int -- ^ State of client.
, clientReadyCount :: Int -- ^ Amount of ready messages.
, clientInFlightCount :: Int -- ^ Amount of flight messages.
, clientMessageCount :: Int -- ^ Amount of messages.
, clientFinishCount :: Int -- ^ Amount of finished messages.
, clientRequeueCount :: Int -- ^ Amount of requeued messages.
, clientConnectTs :: Int
, clientSampleRate :: Int
, clientDeflate :: Bool
, clientSnappy :: Bool
, clientUserAgent :: Text
, clientTls :: Bool
, clientTlsCipherSuite :: Text
, clientTlsVersion :: Text
, clientTlsNegotiatedProtocol :: Text
, clientTlsNegotiatedProtocolIsMutual :: Bool
} deriving (Show, Generic)
instance ToJSON Client where
toJSON = genericToJSON $ aesonPrefix snakeCase
instance FromJSON Client where
parseJSON = genericParseJSON $ aesonPrefix snakeCase
-- | Data type for channel.
data Channel = Channel
{ channelChannelName :: Text -- ^ The name of the channel.
, channelDepth :: Int
, channelBackendDepth :: Int
, channelInFlightCount :: Int -- ^ Amount of flight messages.
, channelDeferredCount :: Int -- ^ Amount of deferred messages.
, channelMessageCount :: Int -- ^ Amount of messages.
, channelRequeueCount :: Int -- ^ Amount of requeued messages.
, channelTimeoutCount :: Int
, channelClients :: [Client]
, channelPaused :: Bool
} deriving (Show, Generic)
instance ToJSON Channel where
toJSON = genericToJSON $ aesonPrefix snakeCase
instance FromJSON Channel where
parseJSON = genericParseJSON $ aesonPrefix snakeCase
data Topic = Topic
{ topicTopicName :: Text
, topicChannels :: [Channel]
, topicDepth :: Int
, topicBackendDepth :: Int
, topicMessageCount :: Int
, topicPaused :: Bool
} deriving (Show, Generic)
instance ToJSON Topic where
toJSON = genericToJSON $ aesonPrefix snakeCase
instance FromJSON Topic where
parseJSON = genericParseJSON $ aesonPrefix snakeCase
data Statistic = Statistic
{ statisticVersion :: Text
, statisticHealth :: Text
, statisticTopics :: [Topic]
, statisticStartTime :: Int
} deriving (Show, Generic)
instance ToJSON Statistic where
toJSON = genericToJSON $ aesonPrefix snakeCase
instance FromJSON Statistic where
parseJSON = genericParseJSON $ aesonPrefix snakeCase
data Report = Report
{ reportTopicName :: Text
, reportMessage :: Int
, reportTotalMessage :: Int
, reportConnection :: Int
, reportDate :: Text
, reportTime :: Text
} deriving (Generic, Show)
instance FromRecord Report
instance ToRecord Report
instance ToNamedRecord Report where
toNamedRecord Report {..} =
Cassava.namedRecord
[ "Topic Name" Cassava..= reportTopicName
, "Message" Cassava..= reportMessage
, "Total Message" Cassava..= reportTotalMessage
, "Connection" Cassava..= reportConnection
, "Date" Cassava..= reportDate
, "Time" Cassava..= reportTime
]
NSQ.hs
As I said before, the main function will be put as processNsq function. This function will hold the flow of the process as general as possible. This is the process flow:
- Prepare the nsq statistic url.
- Lookup the statistic using previous url and decode it.
- Process the decoded statistic.
- Send a notification to slack to notify us about the latest statistic.
Based on the process flow above, I will describe each function and its child function (if it exists) based on the code hierarchy. You can find the final processNsq function below:
-- NSQ.hs
-- | Process NSQ `Topic`.
processNsq :: Text -> IO ()
processNsq topicName = do
nsqStatisticURL <- SE.getEnv "NSQ_STAT_URL"
let statisticURL topic channel =
nsqStatisticURL
++ "?format=json&topic="
++ topic
++ "&channel="
++ channel
-- we hardcode the channel name for now (which is "job"), but we can also
-- send it together with topic name as an argument
jsonString <- lookupNsqStatistic $ statisticURL (unpack topicName) "job"
let statistic = eitherDecode jsonString :: Either String Statistic
case statistic of
Left err -> putStrLn err
Right stat -> processTopicStatistic stat topicName
sendSlackNotification
We prepare the nsq statistic url with statisticURL function. After that, we lookup the statistic using lookupNsqStatistic function:
-- NSQ.hs
-- | Lookup `Topic`'s `Statistic`.
lookupNsqStatistic :: String -> IO BSL.ByteString
lookupNsqStatistic url = simpleHttp url
We decode the response using eitherDecode from Aeson and process it using processTopicStatistic if the response is not an error.
-- NSQ.hs
-- | Process `Topic`'s `Statistic` and generate `Report` file.
processTopicStatistic :: Statistic -> Text -> IO ()
processTopicStatistic statistic topicName = do
topicReports <- breakdownTopicReport (statisticTopics statistic) topicName
reportToCSV $ L.foldl' (++) [] topicReports
As you can see, we need to break the statistic into our own data type using breakdownTopicReport function before we put it into csv file. breakdownTopicReport will accepts 2 arguments which is an array of Topic and the topic name itself. Based on the topic name, we will iterate the topic names and return a monad as topicReports.
-- NSQ.hs
-- | Map over `Topic` and return list of `Report`s.
breakdownTopicReport :: [Topic] -> Text -> IO [[Report]]
breakdownTopicReport topics topicName
| topicName == "all" = do
mapM (iterateTopicName topics) topicNames
| otherwise = do
mapM (iterateTopicName topics) [topicName]
The iterateTopicName will populate reports for specified topic name and return it as an array of report.
-- NSQ.hs
-- | Populate `Report` for specified `Topic`.
iterateTopicName :: [Topic] -> Text -> IO [Report]
iterateTopicName topics topicName = do
let topic = filterTopicName topics topicName
channels = case topic of
Just t -> topicChannels t :: [Channel]
Nothing -> []
reports <- populateReport topic channels
return reports
-- | Get `Topic` from array of `Topic`.
filterTopicName :: [Topic] -> Text -> Maybe Topic
filterTopicName topics topicName =
case L.filter (\x -> topicTopicName x == topicName) topics of
t : _ -> Just t
_ -> Nothing
-- | Map over the `Channel`s to get the `Report`s.
populateReport :: Maybe Topic -> [Channel] -> IO [Report]
populateReport Nothing _ = return []
populateReport (Just topic) chans = mapM (populRep topic) chans
where
populRep t channel = do
formattedDate <- getFormattedDate
formattedTime <- getFormattedTime
return $ Report (topicTopicName t)
(channelDepth channel)
(channelMessageCount channel)
(L.length $ channelClients channel)
formattedDate
formattedTime
-- | Get date which has been formatted into string.
getFormattedDate :: IO Text
getFormattedDate = do
currentLocalTime <- getZonedTime
return $ T.pack $ formatTime defaultTimeLocale "%Y-%m-%d" currentLocalTime
-- | Get time which has been formatted into string.
getFormattedTime :: IO Text
getFormattedTime = do
currentLocalTime <- getZonedTime
return $ T.pack $ formatTime defaultTimeLocale "%H:%M" currentLocalTime
Back to processTopicStatistic function, we need to prepare the header of the csv file to include it into our reportToCSV function. We can get the file location from our environment.
-- NSQ.hs
-- | Put `Report`s into csv file.
reportToCSV :: [Report] -> IO ()
reportToCSV reports = do
csvFile <- SE.getEnv "CSV_FILE"
BSL.appendFile csvFile $ Cassava.encodeByName reportHeader reports
-- | List of header to be used in `Report`.
reportHeader :: Header
reportHeader = Vector.fromList
["Topic Name", "Message", "Total Message", "Connection", "Date", "Time"]
So, we already lookup the statistic from NSQ. Decode and map it into our data type. We have also finished writing it into csv file. The last step is sending a notification to slack to notify us about our new csv file. We will use slack webhook to ease the process. To do this, we need to prepare an object to be sent into our webhook url. Do not forget to encode the object beforehand.
-- NSQ.hs
-- | Send notification to slack channel using web hook.
sendSlackNotification :: IO ()
sendSlackNotification = do
manager <- newManager tlsManagerSettings
let requestObject = object
[ "name" .= ("NSQ Statistic Checker" :: String)
, "text" .= ("NSQ Statistic Report has been compiled" :: String)
]
slackURL <- SE.getEnv "SLACK_URL"
initialRequest <- parseRequest slackURL
let request = initialRequest
{ method = "POST"
, requestHeaders = [("Content-Type", "application/json")]
, requestBody = RequestBodyLBS $ encode requestObject
}
response <- httpLbs request manager
putStrLn $ responseBody response
How to use it?
I already provide a Make file. So, you can just build the project and execute the run section of the Makefile. You can also copy the command and use the custom argument which we made before to process a specific topic.
Build the project
❯ make build
Run the executable
Run it with make:
❯ make run
Run it with stack:
❯ stack exec nsq-message-checker -- topic optimization_rule
That's all that I can share for now. I hope it will benefit anyone who read it in the future. Thank you very much to my mentor and the reader. Forgive me for the crappy post.