11{-# language BangPatterns #-}
2+ {-# language LambdaCase #-}
23module Main where
34
45import Data.ByteString.Lazy (toStrict )
56import qualified Data.ByteString.Lazy as BL
67import qualified Data.ByteString as B
78import Data.ByteString.Builder (toLazyByteString )
8- import Data.ByteString (ByteString )
99import Data.Vector as V (fromList , empty )
1010import Data.IORef
1111import Data.Int
@@ -15,22 +15,15 @@ import Control.Concurrent
1515import Control.Applicative
1616import Control.Monad
1717import Data.Monoid
18- import Control.DeepSeq
19- import System.IO.Unsafe
2018import System.Clock
19+ import Options.Applicative
2120
2221import qualified Database.PostgreSQL.LibPQ as LibPQ
2322
2423import Database.PostgreSQL.Protocol.Types
25- import Database.PostgreSQL.Protocol.Encoders
26- import Database.PostgreSQL.Protocol.Decoders
2724import Database.PostgreSQL.Protocol.DataRows
28- import Database.PostgreSQL.Protocol.Store.Decode
29- import Database.PostgreSQL.Protocol.Codecs.Decoders
30- import Database.PostgreSQL.Driver.Connection
3125import Database.PostgreSQL.Driver
32- import Criterion.Main
33-
26+ --
3427-- CREATE TABLE _bytes_100_of_1k(b bytea);
3528-- CREATE TABLE _bytes_400_of_200(b bytea);
3629-- CREATE TABLE _bytes_10_of_20k(b bytea);
@@ -39,69 +32,107 @@ import Criterion.Main
3932
4033-- INSERT INTO _bytes_100_of_1k(b)
4134-- (SELECT repeat('a', 1000)::bytea FROM generate_series(1, 100));
35+ --
4236-- INSERT INTO _bytes_400_of_200(b)
4337-- (SELECT repeat('a', 200)::bytea FROM generate_series(1, 400));
38+ --
4439-- INSERT INTO _bytes_10_of_20k(b)
4540-- (SELECT repeat('a', 20000)::bytea FROM generate_series(1, 10));
41+ --
4642-- INSERT INTO _bytes_1_of_200(b) VALUES(repeat('a', 200)::bytea);
43+ --
4744-- INSERT INTO _bytes_300_of_100(b)
4845-- (SELECT repeat('a', 100)::bytea FROM generate_series(1, 300));
4946
50- main = defaultMain
51- -- [ bgroup "Requests"
52- -- [
53- -- -- env createConnection (\c -> bench "100 of 1k" . nfIO $ requestAction c)
54- -- bench "parser" $ nf parse bs
55- -- ]
56- -- ]
57-
58- -- benchDataRowDecoder d bs = decodeManyRows d $
59- -- DataRows (DataChunk 380 bs) Empty
60- -- where
61- -- decodeDataRow = do
62- -- (Header _ len) <- decodeHeader
63- -- getByteString len
64-
65- {-# NOINLINE bs #-}
66- bs :: B. ByteString
67- bs = unsafePerformIO $ B. readFile " 1.txt"
68-
69- benchLoop :: IO ()
70- benchLoop = do
71- ref <- newIORef 0 :: IO (IORef Word )
72- rbs <- newIORef " " :: IO (IORef BL. ByteString )
73- ! bs <- B. readFile " 1.txt"
74- let str = BL. cycle $ BL. fromStrict bs
75- writeIORef rbs str
76-
77- let handler dm = case dm of
78- DataMessage _ -> modifyIORef' ref (+ 1 )
79- _ -> pure ()
80- newChunk preBs = do
81- b <- readIORef rbs
82- let (nb, rest) = BL. splitAt 4096 b
83- writeIORef rbs rest
84- -- let res = preBs <> (B.copy $ BL.toStrict nb)
85- let res = preBs <> ( BL. toStrict nb)
86- res `seq` pure res
87- tid <- forkIO $ forever $ loopExtractDataRows newChunk handler
88- threadDelay 1000000
89- killThread tid
90- s <- readIORef ref
91- print $ " Requests: " ++ show s
47+ data Action
48+ = BenchPW RowsType
49+ | BenchLibPQ RowsType
50+ | BenchLoop
51+ deriving (Show , Eq )
52+
53+ data RowsType
54+ = Bytes100_1k
55+ | Bytes400_200
56+ | Bytes10_20k
57+ | Bytes1_200
58+ | Bytes300_100
59+ deriving (Show , Eq )
60+
61+ cli :: Parser Action
62+ cli = hsubparser $
63+ cmd " pw" " benchmark postgres-wire" (BenchPW <$> rowTypeParser)
64+ <> cmd " libpq" " benchmark libpq" (BenchLibPQ <$> rowTypeParser)
65+ <> cmd " loop" " benchmark datarows decoding loop" (pure BenchLoop )
66+ where
67+ cmd c h p = command c (info (helper <*> p) $ header h)
68+ rowTypeParser = hsubparser $
69+ cmd " b100_1k" " 100 rows of 1k bytes" (pure Bytes100_1k )
70+ <> cmd " b400_200" " 400 rows of 200 bytes" (pure Bytes400_200 )
71+ <> cmd " b10_20k" " 10 rows of 20k bytes" (pure Bytes10_20k )
72+ <> cmd " b1_200" " 1 row of 200 bytes" (pure Bytes1_200 )
73+ <> cmd " b300_100" " 300 rows of 100 bytes" (pure Bytes300_100 )
74+
75+ main :: IO ()
76+ main = execParser (info (helper <*> cli) $ header " Postgres-wire benchmark" )
77+ >>= execAction
78+
79+ execAction :: Action -> IO ()
80+ execAction (BenchPW rows) = benchPw $ queryStatement rows
81+ execAction (BenchLibPQ rows) = benchLibpq $ queryStatement rows
82+ execAction BenchLoop = benchLoop
83+
84+ queryStatement :: RowsType -> B. ByteString
85+ queryStatement = \ case
86+ Bytes100_1k -> " SELECT * from _bytes_100_of_1k"
87+ Bytes400_200 -> " SELECT * from _bytes_400_of_200"
88+ Bytes10_20k -> " SELECT * from _bytes_10_of_20k"
89+ Bytes1_200 -> " SELECT * fromm _bytes_1_of_200"
90+ Bytes300_100 -> " SELECT * from _bytes_300_of_100"
91+
92+ benchPw :: B. ByteString -> IO ()
93+ benchPw statement = benchRequests createConnection $ \ c -> do
94+ sendBatchAndSync c [q]
95+ d <- readNextData c
96+ waitReadyForQuery c
97+ where
98+ q = Query statement V. empty Binary Binary AlwaysCache
99+ createConnection = connect defaultSettings >>=
100+ either (error . (" Connection error " <> ) . show ) pure
101+
102+ defaultSettings = defaultConnectionSettings
103+ { settingsHost = " localhost"
104+ , settingsDatabase = " travis_test"
105+ , settingsUser = " postgres"
106+ , settingsPassword = " "
107+ }
108+
109+ benchLibpq :: B. ByteString -> IO ()
110+ benchLibpq statement = benchRequests libpqConnection $ \ c -> do
111+ r <- fromJust <$> LibPQ. execPrepared c " " [] LibPQ. Binary
112+ rows <- LibPQ. ntuples r
113+ parseRows r (rows - 1 )
114+ where
115+ libpqConnection = do
116+ conn <- LibPQ. connectdb " host=localhost user=postgres dbname=travis_test"
117+ LibPQ. prepare conn " " " SELECT * from _bytes_300_of_100" Nothing
118+ pure conn
119+ parseRows r (- 1 ) = pure ()
120+ parseRows r n = LibPQ. getvalue r n 0 >> parseRows r (n - 1 )
92121
93122benchRequests :: IO c -> (c -> IO a ) -> IO ()
94123benchRequests connectAction queryAction = do
95- rs <- replicateM 8 newThread
96- threadDelay $ 2 * 1000000
97- traverse (\ (_,_, tid) -> killThread tid) rs
98- s <- sum <$> traverse (\ (ref, _, _) -> readIORef ref) rs
99- latency_total <- sum <$> traverse (\ (_, ref, _) -> readIORef ref) rs
100- print $ " Requests: " ++ show s
101- print $ " Average latency: " ++ show (latency_total `div` fromIntegral s)
124+ results <- replicateM 8 newThread
125+ threadDelay $ durationSeconds * 1000 * 1000
126+ for_ results $ \ (_, _, tid) -> killThread tid
127+ s <- sum <$> traverse (\ (ref, _, _) -> readIORef ref) results
128+ latency_total <- sum <$> traverse (\ (_, ref, _) -> readIORef ref) results
129+
130+ print $ " Requests per second: " ++ show (s `div` durationSeconds)
131+ print $ " Average latency, ms: " ++ displayLatency latency_total s
102132 where
133+ durationSeconds = 10
103134 newThread = do
104- ref_count <- newIORef 0 :: IO (IORef Word )
135+ ref_count <- newIORef 0 :: IO (IORef Int )
105136 ref_latency <- newIORef 0 :: IO (IORef Int64 )
106137 c <- connectAction
107138 tid <- forkIO $ forever $ do
@@ -113,82 +144,33 @@ benchRequests connectAction queryAction = do
113144 modifyIORef' ref_count (+ 1 )
114145 pure (ref_count, ref_latency, tid)
115146
116- getDifference (TimeSpec end_s end_ns) (TimeSpec start_s start_ns) =
117- (end_s - start_s) * 1000000000 + end_ns - start_ns
147+ getDifference (TimeSpec end_s end_ns) (TimeSpec start_s start_ns) =
148+ (end_s - start_s) * 1000000000 + end_ns - start_ns
118149
119- requestAction c = replicateM_ 100 $ do
120- sendBatchAndSync c [q]
121- readNextData c
122- waitReadyForQuery c
123- where
124- q = Query largeStmt V. empty Binary Binary AlwaysCache
125- largeStmt = " SELECT * from _bytes_1_of_200"
126-
127- benchMultiPw :: IO ()
128- benchMultiPw = benchRequests createConnection $ \ c -> do
129- sendBatchAndSync c [q]
130- d <- readNextData c
131- waitReadyForQuery c
132- where
133- q = Query largeStmt V. empty Binary Binary AlwaysCache
134- largeStmt = " SELECT * from _bytes_300_of_100"
135- -- largeStmt = "select typname, typnamespace, typowner, typlen, typbyval,"
136- -- <> "typcategory, typispreferred, typisdefined, typdelim,"
137- -- <> "typrelid, typelem, typarray from pg_type"
138-
139- benchLibpq :: IO ()
140- benchLibpq = benchRequests libpqConnection $ \ c -> do
141- r <- fromJust <$> LibPQ. execPrepared c " " [] LibPQ. Binary
142- rows <- LibPQ. ntuples r
143- go r (rows - 1 )
144- where
145- libpqConnection = do
146- conn <- LibPQ. connectdb " host=localhost user=postgres dbname=travis_test"
147- LibPQ. prepare conn " " " SELECT * from _bytes_300_of_100" Nothing
148- pure conn
149- go r (- 1 ) = pure ()
150- go r n = LibPQ. getvalue r n 0 >> go r (n - 1 )
151-
152-
153- -- Connection
154- -- | Creates connection with default filter.
155- createConnection :: IO Connection
156- createConnection = getConnection <$> connect defaultSettings
157-
158- getConnection :: Either Error Connection -> Connection
159- getConnection (Left e) = error $ " Connection error " ++ show e
160- getConnection (Right c) = c
161-
162- defaultSettings = defaultConnectionSettings
163- { settingsHost = " localhost"
164- , settingsDatabase = " travis_test"
165- , settingsUser = " postgres"
166- , settingsPassword = " "
167- }
168-
169- -- Orphans
170-
171- instance NFData (AbsConnection a ) where
172- rnf _ = ()
173-
174- instance NFData Error where
175- rnf _ = ()
176-
177- instance (NFData a1 , NFData a2 , NFData a3 , NFData a4 , NFData a5 , NFData a6 , NFData a7 , NFData a8 , NFData a9 , NFData a10 , NFData a11 , NFData a12 ) =>
178- NFData (a1 , a2 , a3 , a4 , a5 , a6 , a7 , a8 , a9 , a10 , a11 , a12 ) where
179- rnf (x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11, x12) =
180- rnf x1 `seq`
181- rnf x2 `seq`
182- rnf x3 `seq`
183- rnf x4 `seq`
184- rnf x5 `seq`
185- rnf x6 `seq`
186- rnf x7 `seq`
187- rnf x8 `seq`
188- rnf x9 `seq`
189- rnf x10 `seq`
190- rnf x11 `seq`
191- rnf x12
192-
193- instance NFData (Decode a ) where
194- rnf ! d = ()
150+ displayLatency latency reqs =
151+ let a = latency `div` fromIntegral reqs
152+ (ms, ns) = a `divMod` 1000000
153+ in show ms <> " ." <> show ns
154+
155+ benchLoop :: IO ()
156+ benchLoop = do
157+ counter <- newIORef 0 :: IO (IORef Word )
158+ content <- newIORef " " :: IO (IORef BL. ByteString )
159+ -- TODO read file
160+ ! bs <- B. readFile " 1.txt"
161+ writeIORef content . BL. cycle $ BL. fromStrict bs
162+
163+ let handler dm = case dm of
164+ DataMessage _ -> modifyIORef' counter (+ 1 )
165+ _ -> pure ()
166+ newChunk preBs = do
167+ b <- readIORef content
168+ let (nb, rest) = BL. splitAt 4096 b
169+ writeIORef content rest
170+ let res = preBs <> ( BL. toStrict nb)
171+ res `seq` pure res
172+ tid <- forkIO . forever $ loopExtractDataRows newChunk handler
173+ threadDelay 10000000
174+ killThread tid
175+ s <- readIORef counter
176+ print $ " Data messages parsed: " ++ show s
0 commit comments