Browse Source

Support for multi-threaded sniffer. Add support for atomic operations instead of mutex in wc_port.h.

David Garske 1 year ago
parent
commit
de22dbe61d
10 changed files with 775 additions and 232 deletions
  1. 1 0
      CMakeLists.txt
  2. 1 0
      Makefile.am
  3. 2 1
      configure.ac
  4. 35 0
      m4/ax_atomic.m4
  5. 214 105
      src/sniffer.c
  6. 5 1
      sslSniffer/README.md
  7. 464 99
      sslSniffer/sslSnifferTest/snifftest.c
  8. 1 2
      wolfcrypt/src/wc_port.c
  9. 21 0
      wolfssl/sniffer.h
  10. 31 24
      wolfssl/wolfcrypt/wc_port.h

+ 1 - 0
CMakeLists.txt

@@ -103,6 +103,7 @@ check_function_exists("inet_ntoa" HAVE_INET_NTOA)
 check_function_exists("memset" HAVE_MEMSET)
 check_function_exists("socket" HAVE_SOCKET)
 check_function_exists("strftime" HAVE_STRFTIME)
+check_function_exists("__atomic_fetch_add" HAVE_C___ATOMIC)
 
 include(CheckTypeSize)
 

+ 1 - 0
Makefile.am

@@ -153,6 +153,7 @@ EXTRA_DIST+= LPCExpresso.cproject
 EXTRA_DIST+= LPCExpresso.project
 EXTRA_DIST+= resource.h wolfssl.rc
 EXTRA_DIST+= CMakeLists.txt
+EXTRA_DIST+= m4/ax_atomic.m4
 
 include cmake/include.am
 include wrapper/include.am

+ 2 - 1
configure.ac

@@ -100,12 +100,13 @@ fi
 AC_CHECK_HEADERS([arpa/inet.h fcntl.h limits.h netdb.h netinet/in.h stddef.h time.h sys/ioctl.h sys/socket.h sys/time.h errno.h sys/un.h])
 AC_CHECK_LIB([network],[socket])
 AC_C_BIGENDIAN
+AC_C___ATOMIC
 
 # check if functions of interest are linkable, but also check if
 # they're declared by the expected headers, and if not, supersede the
 # unusable positive from AC_CHECK_FUNCS().
 AC_CHECK_FUNCS([gethostbyname getaddrinfo gettimeofday gmtime_r gmtime_s inet_ntoa memset socket strftime atexit])
-AC_CHECK_DECLS([gethostbyname, getaddrinfo, gettimeofday, gmtime_r, gmtime_s, inet_ntoa, memset, socket, strftime], [], [
+AC_CHECK_DECLS([gethostbyname, getaddrinfo, gettimeofday, gmtime_r, gmtime_s, inet_ntoa, memset, socket, strftime, atexit], [], [
 if test "$(eval echo \$"$(eval 'echo ac_cv_func_${as_decl_name}')")" = "yes"
 then
     AC_MSG_NOTICE([    note: earlier check for $(eval 'echo ${as_decl_name}') superseded.])

+ 35 - 0
m4/ax_atomic.m4

@@ -0,0 +1,35 @@
+# AC_C___ATOMIC
+# -------------
+# Define HAVE_C___ATOMIC if __atomic works.
+AN_IDENTIFIER([__atomic], [AC_C___ATOMIC])
+AC_DEFUN([AC_C___ATOMIC],
+[AC_CACHE_CHECK([for __atomic], ac_cv_c___atomic,
+[AC_LINK_IFELSE(
+   [AC_LANG_SOURCE(
+      [[int
+        main (int argc, char **argv)
+        {
+          volatile unsigned long ul1 = 1, ul2 = 0, ul3 = 2;
+          __atomic_load_n(&ul1, __ATOMIC_SEQ_CST);
+          __atomic_compare_exchange(&ul1, &ul2, &ul3, 1, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
+          __atomic_fetch_add(&ul1, 1, __ATOMIC_SEQ_CST);
+          __atomic_fetch_sub(&ul3, 1, __ATOMIC_SEQ_CST);
+          __atomic_or_fetch(&ul1, ul2, __ATOMIC_SEQ_CST);
+          __atomic_and_fetch(&ul1, ul2, __ATOMIC_SEQ_CST);
+          volatile unsigned long long ull1 = 1, ull2 = 0, ull3 = 2;
+          __atomic_load_n(&ull1, __ATOMIC_SEQ_CST);
+          __atomic_compare_exchange(&ull1, &ull2, &ull3, 1, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
+          __atomic_fetch_add(&ull1, 1, __ATOMIC_SEQ_CST);
+          __atomic_fetch_sub(&ull3, 1, __ATOMIC_SEQ_CST);
+          __atomic_or_fetch(&ull1, ull2, __ATOMIC_SEQ_CST);
+          __atomic_and_fetch(&ull1, ull2, __ATOMIC_SEQ_CST);
+          return 0;
+        }
+      ]])],
+   [ac_cv_c___atomic=yes],
+   [ac_cv_c___atomic=no])])
+if test $ac_cv_c___atomic = yes; then
+  AC_DEFINE([HAVE_C___ATOMIC], 1,
+           [Define to 1 if __atomic operations work.])
+fi
+])# AC_C___ATOMIC

+ 214 - 105
src/sniffer.c

@@ -151,6 +151,7 @@ enum {
     TRACE_MSG_SZ       = 80,  /* Trace Message buffer size */
     HASH_SIZE          = 499, /* Session Hash Table Rows */
     PSEUDO_HDR_SZ      = 12,  /* TCP Pseudo Header size in bytes */
+    STREAM_INFO_SZ     = 44,  /* SnifferStreamInfo size in bytes */
     FATAL_ERROR_STATE  = 1,   /* SnifferSession fatal error state */
     TICKET_HINT_LEN    = 4,   /* Session Ticket Hint length */
     TICKET_HINT_AGE_LEN= 4,   /* Session Ticket Age add length */
@@ -420,16 +421,6 @@ typedef struct NamedKey {
 
 #endif
 
-
-typedef struct IpAddrInfo {
-    int version;
-    union {
-        word32 ip4;
-        byte   ip6[16];
-    };
-} IpAddrInfo;
-
-
 /* Sniffer Server holds info for each server/port monitored */
 typedef struct SnifferServer {
     WOLFSSL_CTX*   ctx;                          /* SSL context */
@@ -563,21 +554,27 @@ typedef struct SnifferSession {
 
 
 /* Sniffer Server List and mutex */
-static WOLFSSL_GLOBAL SnifferServer* ServerList = NULL;
+static THREAD_LS_T WOLFSSL_GLOBAL SnifferServer* ServerList = NULL;
+#ifndef HAVE_C___ATOMIC
 static WOLFSSL_GLOBAL wolfSSL_Mutex ServerListMutex;
+#endif
 
 /* Session Hash Table, mutex, and count */
-static WOLFSSL_GLOBAL SnifferSession* SessionTable[HASH_SIZE];
+static THREAD_LS_T WOLFSSL_GLOBAL SnifferSession* SessionTable[HASH_SIZE];
+#ifndef HAVE_C___ATOMIC
 static WOLFSSL_GLOBAL wolfSSL_Mutex SessionMutex;
-static WOLFSSL_GLOBAL int SessionCount = 0;
+#endif
+static THREAD_LS_T WOLFSSL_GLOBAL int SessionCount = 0;
 
-/* Recovery of missed data switches and stats */
-static WOLFSSL_GLOBAL wolfSSL_Mutex RecoveryMutex; /* for stats */
 static WOLFSSL_GLOBAL int RecoveryEnabled    = 0;  /* global switch */
 static WOLFSSL_GLOBAL int MaxRecoveryMemory  = -1;
                                            /* per session max recovery memory */
+#ifndef WOLFSSL_SNIFFER_NO_RECOVERY
+/* Recovery of missed data switches and stats */
+static WOLFSSL_GLOBAL wolfSSL_Mutex RecoveryMutex; /* for stats */
+/* # of sessions with missed data */
 static WOLFSSL_GLOBAL word32 MissedDataSessions = 0;
-                                            /* # of sessions with missed data */
+#endif
 
 /* Connection Info Callback */
 static WOLFSSL_GLOBAL SSLConnCb ConnectionCb;
@@ -606,25 +603,45 @@ static WOLFSSL_GLOBAL SSLStoreDataCb StoreDataCb;
 #endif
 
 
+#ifndef WOLFSSL_SNIFFER_NO_RECOVERY
 static void UpdateMissedDataSessions(void)
 {
     wc_LockMutex(&RecoveryMutex);
     MissedDataSessions += 1;
     wc_UnLockMutex(&RecoveryMutex);
 }
-
+#endif
 
 #ifdef WOLFSSL_SNIFFER_STATS
-#define LOCK_STAT() do { wc_LockMutex(&StatsMutex); } while (0)
-#define UNLOCK_STAT() do { wc_UnLockMutex(&StatsMutex); } while (0)
-#define NOLOCK_ADD_TO_STAT(x,y) do { TraceStat(#x, y); x += y; } while (0)
-#define NOLOCK_INC_STAT(x) NOLOCK_ADD_TO_STAT(x,1)
-#define ADD_TO_STAT(x,y) do { LOCK_STAT(); \
-    NOLOCK_ADD_TO_STAT(x,y); UNLOCK_STAT(); } while (0)
-#define INC_STAT(x) do { LOCK_STAT(); \
-    NOLOCK_INC_STAT(x); UNLOCK_STAT(); } while (0)
+    #ifdef HAVE_C___ATOMIC
+        #define LOCK_STAT()
+        #define UNLOCK_STAT()
+        #define NOLOCK_ADD_TO_STAT(x,y) ({ TraceStat(#x, y); \
+            __atomic_fetch_add(&x, y, __ATOMIC_RELAXED); })
+    #else
+        #define LOCK_STAT() wc_LockMutex(&StatsMutex)
+        #define UNLOCK_STAT() wc_UnLockMutex(&StatsMutex)
+        #define NOLOCK_ADD_TO_STAT(x,y) ({ TraceStat(#x, y); x += y; })
+    #endif
+    #define NOLOCK_INC_STAT(x) NOLOCK_ADD_TO_STAT(x,1)
+    #define ADD_TO_STAT(x,y) do { LOCK_STAT(); \
+        NOLOCK_ADD_TO_STAT(x,y); UNLOCK_STAT(); } while (0)
+    #define INC_STAT(x) do { LOCK_STAT(); \
+        NOLOCK_INC_STAT(x); UNLOCK_STAT(); } while (0)
 #endif /* WOLFSSL_SNIFFER_STATS */
 
+#ifdef HAVE_C___ATOMIC
+    #define LOCK_SESSION()
+    #define UNLOCK_SESSION()
+    #define LOCK_SERVER_LIST()
+    #define UNLOCK_SERVER_LIST()
+#else
+    #define LOCK_SESSION() wc_LockMutex(&SessionMutex)
+    #define UNLOCK_SESSION() wc_UnLockMutex(&SessionMutex)
+    #define LOCK_SERVER_LIST() wc_LockMutex(&ServerListMutex)
+    #define UNLOCK_SERVER_LIST() wc_UnLockMutex(&ServerListMutex)
+#endif
+
 
 #if defined(WOLF_CRYPTO_CB) || defined(WOLFSSL_ASYNC_CRYPT)
     static WOLFSSL_GLOBAL int CryptoDeviceId = INVALID_DEVID;
@@ -635,9 +652,13 @@ static void UpdateMissedDataSessions(void)
 void ssl_InitSniffer_ex(int devId)
 {
     wolfSSL_Init();
+#ifndef HAVE_C___ATOMIC
     wc_InitMutex(&ServerListMutex);
     wc_InitMutex(&SessionMutex);
+#endif
+#ifndef WOLFSSL_SNIFFER_NO_RECOVERY
     wc_InitMutex(&RecoveryMutex);
+#endif
 #ifdef WOLFSSL_SNIFFER_STATS
     XMEMSET(&SnifferStats, 0, sizeof(SSLStats));
     wc_InitMutex(&StatsMutex);
@@ -648,7 +669,7 @@ void ssl_InitSniffer_ex(int devId)
     (void)devId;
 }
 
-void ssl_InitSniffer(void)
+static int GetDevId(void)
 {
     int devId = INVALID_DEVID;
 
@@ -666,17 +687,50 @@ void ssl_InitSniffer(void)
     }
     #endif
 #endif
+
+    return devId;
+}
+
+void ssl_InitSniffer(void)
+{
+    int devId;
+
+    devId = GetDevId();
+
 #ifdef WOLFSSL_ASYNC_CRYPT
     if (wolfAsync_DevOpen(&devId) < 0) {
         fprintf(stderr, "Async device open failed\nRunning without async\n");
         devId = INVALID_DEVID;
     }
 #endif /* WOLFSSL_ASYNC_CRYPT */
+
     (void)devId;
 
     ssl_InitSniffer_ex(devId);
 }
 
+void ssl_InitSniffer_ex2(int threadNum)
+{
+    int devId;
+
+    devId = GetDevId();
+
+#ifdef WOLFSSL_ASYNC_CRYPT
+#ifndef WC_NO_ASYNC_THREADING
+    if (wolfAsync_DevOpenThread(&devId,&threadNum) < 0) {
+#else
+    if (wolfAsync_DevOpen(&devId) < 0) {
+#endif
+        fprintf(stderr, "Async device open failed\nRunning without async\n");
+        devId = INVALID_DEVID;
+    }
+#endif /* WOLFSSL_ASYNC_CRYPT */
+
+    (void)devId;
+    (void)threadNum;
+
+    ssl_InitSniffer_ex(devId);
+}
 
 #ifdef HAVE_SNI
 
@@ -787,8 +841,8 @@ void ssl_FreeSniffer(void)
     SnifferSession* removeSession;
     int i;
 
-    wc_LockMutex(&ServerListMutex);
-    wc_LockMutex(&SessionMutex);
+    LOCK_SERVER_LIST();
+    LOCK_SESSION();
 
     /* Free sessions (wolfSSL objects) first */
     for (i = 0; i < HASH_SIZE; i++) {
@@ -810,12 +864,15 @@ void ssl_FreeSniffer(void)
     }
     ServerList = NULL;
 
-    wc_UnLockMutex(&SessionMutex);
-    wc_UnLockMutex(&ServerListMutex);
-
+    UNLOCK_SESSION();
+    UNLOCK_SERVER_LIST();
+#ifndef WOLFSSL_SNIFFER_NO_RECOVERY
     wc_FreeMutex(&RecoveryMutex);
+#endif
+#ifndef HAVE_C___ATOMIC
     wc_FreeMutex(&SessionMutex);
     wc_FreeMutex(&ServerListMutex);
+#endif
 
 #ifdef WOLF_CRYPTO_CB
     #ifdef HAVE_INTEL_QA_SYNC
@@ -1367,7 +1424,7 @@ static int IsServerRegistered(word32 addr)
     int ret = 0;     /* false */
     SnifferServer* sniffer;
 
-    wc_LockMutex(&ServerListMutex);
+    LOCK_SERVER_LIST();
 
     sniffer = ServerList;
     while (sniffer) {
@@ -1378,7 +1435,7 @@ static int IsServerRegistered(word32 addr)
         sniffer = sniffer->next;
     }
 
-    wc_UnLockMutex(&ServerListMutex);
+    UNLOCK_SERVER_LIST();
 
     return ret;
 }
@@ -1392,7 +1449,7 @@ static int IsServerRegistered6(byte* addr)
     int ret = 0;     /* false */
     SnifferServer* sniffer;
 
-    wc_LockMutex(&ServerListMutex);
+    LOCK_SERVER_LIST();
 
     sniffer = ServerList;
     while (sniffer) {
@@ -1404,7 +1461,7 @@ static int IsServerRegistered6(byte* addr)
         sniffer = sniffer->next;
     }
 
-    wc_UnLockMutex(&ServerListMutex);
+    UNLOCK_SERVER_LIST();
 
     return ret;
 }
@@ -1417,7 +1474,7 @@ static int IsPortRegistered(word32 port)
     int ret = 0;    /* false */
     SnifferServer* sniffer;
 
-    wc_LockMutex(&ServerListMutex);
+    LOCK_SERVER_LIST();
 
     sniffer = ServerList;
     while (sniffer) {
@@ -1428,7 +1485,7 @@ static int IsPortRegistered(word32 port)
         sniffer = sniffer->next;
     }
 
-    wc_UnLockMutex(&ServerListMutex);
+    UNLOCK_SERVER_LIST();
 
     return ret;
 }
@@ -1441,7 +1498,7 @@ static SnifferServer* GetSnifferServer(IpInfo* ipInfo, TcpInfo* tcpInfo)
 {
     SnifferServer* sniffer;
 
-    wc_LockMutex(&ServerListMutex);
+    LOCK_SERVER_LIST();
 
     sniffer = ServerList;
 
@@ -1464,7 +1521,7 @@ static SnifferServer* GetSnifferServer(IpInfo* ipInfo, TcpInfo* tcpInfo)
     (void)tcpInfo;
 #endif
 
-    wc_UnLockMutex(&ServerListMutex);
+    UNLOCK_SERVER_LIST();
 
     return sniffer;
 }
@@ -1501,8 +1558,7 @@ static SnifferSession* GetSnifferSession(IpInfo* ipInfo, TcpInfo* tcpInfo)
     time_t          currTime = wc_Time(NULL);
     word32          row = SessionHash(ipInfo, tcpInfo);
 
-    wc_LockMutex(&SessionMutex);
-
+    LOCK_SESSION();
     session = SessionTable[row];
     while (session) {
         if (MatchAddr(session->server, ipInfo->src) &&
@@ -1523,7 +1579,7 @@ static SnifferSession* GetSnifferSession(IpInfo* ipInfo, TcpInfo* tcpInfo)
     if (session)
         session->lastUsed= currTime; /* keep session alive, remove stale will */
                                      /* leave alone */
-    wc_UnLockMutex(&SessionMutex);
+    UNLOCK_SESSION();
 
     /* determine side */
     if (session) {
@@ -1660,10 +1716,10 @@ static int CreateWatchSnifferServer(char* error)
 #endif
 
     /* add to server list */
-    wc_LockMutex(&ServerListMutex);
+    LOCK_SERVER_LIST();
     sniffer->next = ServerList;
     ServerList = sniffer;
-    wc_UnLockMutex(&ServerListMutex);
+    UNLOCK_SERVER_LIST();
 
     return 0;
 }
@@ -1839,10 +1895,10 @@ int ssl_SetNamedPrivateKey(const char* name,
     TraceHeader();
     TraceSetNamedServer(name, address, port, keyFile);
 
-    wc_LockMutex(&ServerListMutex);
+    LOCK_SERVER_LIST();
     ret = SetNamedPrivateKey(name, address, port, keyFile, 0,
                              typeKey, password, error, 0);
-    wc_UnLockMutex(&ServerListMutex);
+    UNLOCK_SERVER_LIST();
 
     if (ret == 0)
         Trace(NEW_SERVER_STR);
@@ -1860,10 +1916,10 @@ int ssl_SetNamedPrivateKeyBuffer(const char* name,
     TraceHeader();
     TraceSetNamedServer(name, address, port, NULL);
 
-    wc_LockMutex(&ServerListMutex);
+    LOCK_SERVER_LIST();
     ret = SetNamedPrivateKey(name, address, port, keyBuf, keySz,
                              typeKey, password, error, 0);
-    wc_UnLockMutex(&ServerListMutex);
+    UNLOCK_SERVER_LIST();
 
     if (ret == 0)
         Trace(NEW_SERVER_STR);
@@ -1883,10 +1939,10 @@ int ssl_SetPrivateKey(const char* address, int port,
     TraceHeader();
     TraceSetServer(address, port, keyFile);
 
-    wc_LockMutex(&ServerListMutex);
+    LOCK_SERVER_LIST();
     ret = SetNamedPrivateKey(NULL, address, port, keyFile, 0,
                              typeKey, password, error, 0);
-    wc_UnLockMutex(&ServerListMutex);
+    UNLOCK_SERVER_LIST();
 
     if (ret == 0)
         Trace(NEW_SERVER_STR);
@@ -1903,10 +1959,10 @@ int ssl_SetPrivateKeyBuffer(const char* address, int port,
     TraceHeader();
     TraceSetServer(address, port, "from buffer");
 
-    wc_LockMutex(&ServerListMutex);
+    LOCK_SERVER_LIST();
     ret = SetNamedPrivateKey(NULL, address, port, keyBuf, keySz,
                              typeKey, password, error, 0);
-    wc_UnLockMutex(&ServerListMutex);
+    UNLOCK_SERVER_LIST();
 
     if (ret == 0)
         Trace(NEW_SERVER_STR);
@@ -1928,10 +1984,10 @@ int ssl_SetNamedEphemeralKey(const char* name,
     TraceHeader();
     TraceSetNamedServer(name, address, port, keyFile);
 
-    wc_LockMutex(&ServerListMutex);
+    LOCK_SERVER_LIST();
     ret = SetNamedPrivateKey(name, address, port, keyFile, 0,
                              typeKey, password, error, 1);
-    wc_UnLockMutex(&ServerListMutex);
+    UNLOCK_SERVER_LIST();
 
     if (ret == 0)
         Trace(NEW_SERVER_STR);
@@ -1949,10 +2005,10 @@ int ssl_SetNamedEphemeralKeyBuffer(const char* name,
     TraceHeader();
     TraceSetNamedServer(name, address, port, NULL);
 
-    wc_LockMutex(&ServerListMutex);
+    LOCK_SERVER_LIST();
     ret = SetNamedPrivateKey(name, address, port, keyBuf, keySz,
                              typeKey, password, error, 1);
-    wc_UnLockMutex(&ServerListMutex);
+    UNLOCK_SERVER_LIST();
 
     if (ret == 0)
         Trace(NEW_SERVER_STR);
@@ -1972,10 +2028,10 @@ int ssl_SetEphemeralKey(const char* address, int port,
     TraceHeader();
     TraceSetServer(address, port, keyFile);
 
-    wc_LockMutex(&ServerListMutex);
+    LOCK_SERVER_LIST();
     ret = SetNamedPrivateKey(NULL, address, port, keyFile, 0,
                              typeKey, password, error, 1);
-    wc_UnLockMutex(&ServerListMutex);
+    UNLOCK_SERVER_LIST();
 
     if (ret == 0)
         Trace(NEW_SERVER_STR);
@@ -1992,10 +2048,10 @@ int ssl_SetEphemeralKeyBuffer(const char* address, int port,
     TraceHeader();
     TraceSetServer(address, port, "from buffer");
 
-    wc_LockMutex(&ServerListMutex);
+    LOCK_SERVER_LIST();
     ret = SetNamedPrivateKey(NULL, address, port, keyBuf, keySz,
                              typeKey, password, error, 1);
-    wc_UnLockMutex(&ServerListMutex);
+    UNLOCK_SERVER_LIST();
 
     if (ret == 0)
         Trace(NEW_SERVER_STR);
@@ -2057,15 +2113,18 @@ static int CheckIp6Hdr(Ip6Hdr* iphdr, IpInfo* info, int length, char* error)
 /* Check IP Header for IPV4, TCP, and a registered server address */
 /* If header IPv6, pass to CheckIp6Hdr(). */
 /* returns 0 on success, -1 on error */
-static int CheckIpHdr(IpHdr* iphdr, IpInfo* info, int length, char* error)
+static int CheckIpHdr(IpHdr* iphdr, IpInfo* info, int length, char* error,
+                      int trace)
 {
     int version = IP_V(iphdr);
 
     if (version == IPV6)
         return CheckIp6Hdr((Ip6Hdr*)iphdr, info, length, error);
 
-    TraceIP(iphdr);
-    Trace(IP_CHECK_STR);
+    if (trace) {
+        TraceIP(iphdr);
+        Trace(IP_CHECK_STR);
+    }
 
     if (version != IPV4) {
         SetError(BAD_IPVER_STR, error, NULL, 0);
@@ -2077,13 +2136,6 @@ static int CheckIpHdr(IpHdr* iphdr, IpInfo* info, int length, char* error)
         return -1;
     }
 
-#ifndef WOLFSSL_SNIFFER_WATCH
-    if (!IsServerRegistered(iphdr->src) && !IsServerRegistered(iphdr->dst)) {
-        SetError(SERVER_NOT_REG_STR, error, NULL, 0);
-        return -1;
-    }
-#endif
-
     info->length  = IP_HL(iphdr);
     info->total   = XNTOHS(iphdr->length);
     info->src.version = IPV4;
@@ -2100,10 +2152,13 @@ static int CheckIpHdr(IpHdr* iphdr, IpInfo* info, int length, char* error)
 
 /* Check TCP Header for a registered port */
 /* returns 0 on success, -1 on error */
-static int CheckTcpHdr(TcpHdr* tcphdr, TcpInfo* info, char* error)
+static int CheckTcpHdr(TcpHdr* tcphdr, TcpInfo* info, char* error, int trace)
 {
-    TraceTcp(tcphdr);
-    Trace(TCP_CHECK_STR);
+    if (trace) {
+        TraceTcp(tcphdr);
+        Trace(TCP_CHECK_STR);
+    }
+
     info->srcPort   = XNTOHS(tcphdr->srcPort);
     info->dstPort   = XNTOHS(tcphdr->dstPort);
     info->length    = TCP_LEN(tcphdr);
@@ -2115,14 +2170,7 @@ static int CheckTcpHdr(TcpHdr* tcphdr, TcpInfo* info, char* error)
     if (info->ack)
         info->ackNumber = XNTOHL(tcphdr->ack);
 
-#ifndef WOLFSSL_SNIFFER_WATCH
-    if (!IsPortRegistered(info->srcPort) && !IsPortRegistered(info->dstPort)) {
-        SetError(SERVER_PORT_NOT_REG_STR, error, NULL, 0);
-        return -1;
-    }
-#else
     (void)error;
-#endif
 
     return 0;
 }
@@ -4912,20 +4960,26 @@ static void RemoveSession(SnifferSession* session, IpInfo* ipInfo,
     SnifferSession* previous = 0;
     SnifferSession* current;
     word32          row = rowHint;
+#ifndef HAVE_C___ATOMIC
     int             haveLock = 0;
-
+#endif
     Trace(REMOVE_SESSION_STR);
 
     if (ipInfo && tcpInfo)
         row = SessionHash(ipInfo, tcpInfo);
+#ifndef HAVE_C___ATOMIC
     else
         haveLock = 1;
+#endif
 
     if (row >= HASH_SIZE)
         return;
 
-    if (!haveLock)
-        wc_LockMutex(&SessionMutex);
+#ifndef HAVE_C___ATOMIC
+    if (!haveLock) {
+        LOCK_SESSION();
+    }
+#endif
 
     current = SessionTable[row];
 
@@ -4943,8 +4997,11 @@ static void RemoveSession(SnifferSession* session, IpInfo* ipInfo,
         current  = current->next;
     }
 
-    if (!haveLock)
-        wc_UnLockMutex(&SessionMutex);
+#ifndef HAVE_C___ATOMIC
+    if (!haveLock) {
+        UNLOCK_SESSION();
+    }
+#endif
 }
 
 
@@ -5043,7 +5100,7 @@ static SnifferSession* CreateSession(IpInfo* ipInfo, TcpInfo* tcpInfo,
     row = SessionHash(ipInfo, tcpInfo);
 
     /* add it to the session table */
-    wc_LockMutex(&SessionMutex);
+    LOCK_SESSION();
 
     session->next = SessionTable[row];
     SessionTable[row] = session;
@@ -5055,7 +5112,7 @@ static SnifferSession* CreateSession(IpInfo* ipInfo, TcpInfo* tcpInfo,
         RemoveStaleSessions();
     }
 
-    wc_UnLockMutex(&SessionMutex);
+    UNLOCK_SESSION();
 
     /* CreateSession is called in response to a SYN packet, we know this
      * is headed to the server. Also we know the server is one we care
@@ -5114,7 +5171,7 @@ static int DoOldHello(SnifferSession* session, const byte* sslFrame,
    TcpChecksum(&ipInfo, &tcpInfo, sslBytes, packet + ipInfo.length);
    could also add a 64bit version if type available and using this
 */
-int TcpChecksum(IpInfo* ipInfo, TcpInfo* tcpInfo, int dataLen,
+static int TcpChecksum(IpInfo* ipInfo, TcpInfo* tcpInfo, int dataLen,
                 const byte* packet)
 {
     TcpPseudoHdr  pseudo;
@@ -5123,8 +5180,8 @@ int TcpChecksum(IpInfo* ipInfo, TcpInfo* tcpInfo, int dataLen,
     word32        sum = 0;
     word16        checksum;
 
-    pseudo.src = ipInfo->src;
-    pseudo.dst = ipInfo->dst;
+    pseudo.src = ipInfo->src.ip4;
+    pseudo.dst = ipInfo->dst.ip4;
     pseudo.rsv = 0;
     pseudo.protocol = TCP_PROTO;
     pseudo.length = htons(tcpInfo->length + dataLen);
@@ -5167,13 +5224,17 @@ int TcpChecksum(IpInfo* ipInfo, TcpInfo* tcpInfo, int dataLen,
 /* Check IP and TCP headers, set payload */
 /* returns 0 on success, -1 on error */
 static int CheckHeaders(IpInfo* ipInfo, TcpInfo* tcpInfo, const byte* packet,
-                  int length, const byte** sslFrame, int* sslBytes, char* error)
+    int length, const byte** sslFrame, int* sslBytes, char* error,
+    int checkReg, int trace)
 {
     IpHdr* iphdr = (IpHdr*)packet;
+    TcpHdr* tcphdr;
     int version;
 
-    TraceHeader();
-    TracePacket();
+    if (trace) {
+        TraceHeader();
+        TracePacket();
+    }
 
     /* ip header */
     if (length < IP_HDR_SZ) {
@@ -5191,17 +5252,35 @@ static int CheckHeaders(IpInfo* ipInfo, TcpInfo* tcpInfo, const byte* packet,
         }
     }
 
-    if (CheckIpHdr((IpHdr*)packet, ipInfo, length, error) != 0)
+    if (CheckIpHdr(iphdr, ipInfo, length, error, trace) != 0)
         return -1;
 
+#ifndef WOLFSSL_SNIFFER_WATCH
+    if (checkReg &&
+           !IsServerRegistered(iphdr->src) && !IsServerRegistered(iphdr->dst)) {
+        SetError(SERVER_NOT_REG_STR, error, NULL, 0);
+        return -1;
+    }
+#endif
+
     /* tcp header */
     if (length < (ipInfo->length + TCP_HDR_SZ)) {
         SetError(PACKET_HDR_SHORT_STR, error, NULL, 0);
         return -1;
     }
-    if (CheckTcpHdr((TcpHdr*)(packet + ipInfo->length), tcpInfo, error) != 0)
+    tcphdr = (TcpHdr*)(packet + ipInfo->length);
+    if (CheckTcpHdr(tcphdr, tcpInfo, error, trace) != 0)
         return -1;
 
+#ifndef WOLFSSL_SNIFFER_WATCH
+    if (checkReg &&
+         !IsPortRegistered(tcpInfo->srcPort) &&
+            !IsPortRegistered(tcpInfo->dstPort)) {
+        SetError(SERVER_PORT_NOT_REG_STR, error, NULL, 0);
+        return -1;
+    }
+#endif
+
     /* setup */
     *sslFrame = packet + ipInfo->length + tcpInfo->length;
     if (*sslFrame > packet + length) {
@@ -5213,6 +5292,8 @@ static int CheckHeaders(IpInfo* ipInfo, TcpInfo* tcpInfo, const byte* packet,
      * data after the IP record for the FCS for Ethernet. */
     *sslBytes = (int)(packet + ipInfo->total - *sslFrame);
 
+    (void)checkReg;
+
     return 0;
 }
 
@@ -5308,7 +5389,6 @@ static PacketBuffer* CreateBuffer(word32* begin, word32 end, const byte* data,
     return pb;
 }
 
-
 /* Add sslFrame to Reassembly List */
 /* returns 1 (end) on success, -1, on error */
 static int AddToReassembly(byte from, word32 seq, const byte* sslFrame,
@@ -5410,7 +5490,6 @@ static int AddToReassembly(byte from, word32 seq, const byte* sslFrame,
     return 1;
 }
 
-
 /* Add out of order FIN capture */
 /* returns 1 for success (end) */
 static int AddFinCapture(SnifferSession* session, word32 sequence)
@@ -5809,7 +5888,9 @@ static int CheckSequence(IpInfo* ipInfo, TcpInfo* tcpInfo,
     TraceSequence(tcpInfo->sequence, *sslBytes);
     if (CheckAck(tcpInfo, session) < 0) {
         if (!RecoveryEnabled) {
+        #ifndef WOLFSSL_SNIFFER_NO_RECOVERY
             UpdateMissedDataSessions();
+        #endif
             SetError(ACK_MISSED_STR, error, session, FATAL_ERROR_STATE);
             return -1;
         }
@@ -5817,7 +5898,9 @@ static int CheckSequence(IpInfo* ipInfo, TcpInfo* tcpInfo,
             SetError(ACK_MISSED_STR, error, session, 0);
             if (*ackFault == 0) {
                 *ackFault = 1;
+            #ifndef WOLFSSL_SNIFFER_NO_RECOVERY
                 UpdateMissedDataSessions();
+            #endif
             }
             return FixSequence(tcpInfo, session);
         }
@@ -6393,6 +6476,29 @@ static int RemoveFatalSession(IpInfo* ipInfo, TcpInfo* tcpInfo,
     return 0;
 }
 
+int ssl_DecodePacket_GetStream(SnifferStreamInfo* info, const byte* packet,
+        int length, char* error )
+{
+    TcpInfo           tcpInfo;
+    IpInfo            ipInfo;
+    const byte*       sslFrame = NULL;
+    int               sslBytes = 0;
+
+    XMEMSET(&tcpInfo, 0, sizeof(tcpInfo));
+    XMEMSET(&ipInfo, 0, sizeof(ipInfo));
+
+    if (CheckHeaders(&ipInfo, &tcpInfo, packet, length, &sslFrame, &sslBytes,
+            error, 0, 0) != 0) {
+        return WOLFSSL_SNIFFER_ERROR;
+    }
+
+    info->src     = ipInfo.src;
+    info->dst     = ipInfo.dst;
+    info->srcPort = tcpInfo.srcPort;
+    info->dstPort = tcpInfo.dstPort;
+
+    return 0;
+}
 
 /* Passes in an IP/TCP packet for decoding (ethernet/localhost frame) removed */
 /* returns Number of bytes on success, 0 for no data yet, and
@@ -6412,7 +6518,6 @@ static int ssl_DecodePacketInternal(const byte* packet, int length, int isChain,
     SnifferSession*   session = NULL;
     void* vChain = NULL;
     word32 chainSz = 0;
-
     if (isChain) {
 #ifdef WOLFSSL_SNIFFER_CHAIN_INPUT
         struct iovec* chain;
@@ -6433,8 +6538,9 @@ static int ssl_DecodePacketInternal(const byte* packet, int length, int isChain,
     }
 
     if (CheckHeaders(&ipInfo, &tcpInfo, packet, length, &sslFrame, &sslBytes,
-                     error) != 0)
+                     error, 1, 1) != 0) {
         return WOLFSSL_SNIFFER_ERROR;
+    }
 
     end = sslFrame + sslBytes;
 
@@ -6697,9 +6803,11 @@ int ssl_GetSessionStats(unsigned int* active,     unsigned int* total,
     int ret;
 
     if (missedData) {
+    #ifndef WOLFSSL_SNIFFER_NO_RECOVERY
         wc_LockMutex(&RecoveryMutex);
         *missedData = MissedDataSessions;
         wc_UnLockMutex(&RecoveryMutex);
+    #endif
     }
 
     if (reassemblyMem) {
@@ -6707,7 +6815,8 @@ int ssl_GetSessionStats(unsigned int* active,     unsigned int* total,
         int i;
 
         *reassemblyMem = 0;
-        wc_LockMutex(&SessionMutex);
+        LOCK_SESSION();
+
         for (i = 0; i < HASH_SIZE; i++) {
             session = SessionTable[i];
             while (session) {
@@ -6716,7 +6825,7 @@ int ssl_GetSessionStats(unsigned int* active,     unsigned int* total,
                 session = session->next;
             }
         }
-        wc_UnLockMutex(&SessionMutex);
+        UNLOCK_SESSION();
     }
 
     ret = wolfSSL_get_session_stats(active, total, peak, maxSessions);
@@ -6953,7 +7062,7 @@ int ssl_PollSniffer(WOLF_EVENT** events, int maxEvents, WOLF_EVENT_FLAG flags,
     int i;
     SnifferServer* srv;
 
-    wc_LockMutex(&ServerListMutex);
+    LOCK_SERVER_LIST();
 
     /* Iterate the open sniffer sessions calling wolfSSL_CTX_AsyncPoll */
     srv = ServerList;
@@ -6976,11 +7085,11 @@ int ssl_PollSniffer(WOLF_EVENT** events, int maxEvents, WOLF_EVENT_FLAG flags,
         srv = srv->next;
     }
 
-    wc_UnLockMutex(&ServerListMutex);
+    UNLOCK_SERVER_LIST();
 
 
     /* iterate list and mark polled */
-    wc_LockMutex(&SessionMutex);
+    LOCK_SESSION();
     for (i=0; i<eventCount; i++) {
         WOLFSSL* ssl = (WOLFSSL*)events[i]->context;
         SnifferSession* session = FindSession(ssl);
@@ -6989,7 +7098,7 @@ int ssl_PollSniffer(WOLF_EVENT** events, int maxEvents, WOLF_EVENT_FLAG flags,
             session->sslServer->error = events[i]->ret;
         }
     }
-    wc_UnLockMutex(&SessionMutex);
+    UNLOCK_SESSION();
 
     *pEventCount = eventCount;
 

+ 5 - 1
sslSniffer/README.md

@@ -122,7 +122,7 @@ The following table lists the accepted inputs in saved file mode.
 
 Synopsis:
 
-`snifftest  dumpFile pemKey [server] [port] [password]`
+`snifftest  dumpFile pemKey [server] [port] [password] [threads]`
 
 `snifftest` Options Summary:
 
@@ -133,6 +133,7 @@ pemKey      The server’s private key in PEM format      NA
 server      The server’s IP address (v4 or v6)          127.0.0.1
 port        The server port to sniff                    443
 password    Private Key Password if required            NA
+threads     The number of threads to run with           5
 ```
 
 To decode a pcap file named test.pcap with a server key file called myKey.pem that was generated on the localhost with a server at port 443 just use:
@@ -147,6 +148,9 @@ If the server was on localhost using IPv6 and on port 12345 you could instead us
 
 `./snifftest test.pcap myKey.pem ::1 12345`
 
+If you wanted to use 15 threads to decode `test.pcap` and your key does not require a password, you could use a dummy password and run:
+
+`./snifftest test.pcap myKey.pem 10.0.1.2 12345 pass 15`
 
 ## API Usage
 

+ 464 - 99
sslSniffer/sslSnifferTest/snifftest.c

@@ -38,6 +38,10 @@
     #define WOLFSSL_SNIFFER
 #endif
 
+#ifdef THREADED_SNIFFTEST
+#include <pthread.h>
+#endif
+
 #ifndef WOLFSSL_SNIFFER
 #ifndef NO_MAIN_DRIVER
 /* blank build */
@@ -466,20 +470,28 @@ static void show_usage(void)
     printf("usage:\n");
     printf("\t./snifftest\n");
     printf("\t\tprompts for options\n");
+#ifdef THREADED_SNIFFTEST
+    printf("\t./snifftest dump pemKey [server] [port] [password] [threads]\n");
+#else
     printf("\t./snifftest dump pemKey [server] [port] [password]\n");
+#endif
 }
 
-
-#ifdef WOLFSSL_ASYNC_CRYPT
-
 typedef struct SnifferPacket {
     byte* packet;
     int   length;
     int   lastRet;
     int   packetNumber;
+#ifdef THREADED_SNIFFTEST
+    struct SnifferPacket* next;
+    struct SnifferPacket* prev;
+    int    placeholder;
+#endif
 } SnifferPacket;
 
-static SnifferPacket asyncQueue[WOLF_ASYNC_MAX_PENDING];
+#ifdef WOLFSSL_ASYNC_CRYPT
+
+static THREAD_LS_T SnifferPacket asyncQueue[WOLF_ASYNC_MAX_PENDING];
 
 /* returns index to queue */
 static int SnifferAsyncQueueAdd(int lastRet, void* chain, int chainSz,
@@ -513,7 +525,8 @@ static int SnifferAsyncQueueAdd(int lastRet, void* chain, int chainSz,
         }
     }
     if (ret != MEMORY_E) {
-        asyncQueue[ret].packet = XMALLOC(length, NULL, DYNAMIC_TYPE_TMP_BUFFER);
+        asyncQueue[ret].packet = (byte*)XMALLOC(length, NULL,
+                                                DYNAMIC_TYPE_TMP_BUFFER);
         if (asyncQueue[ret].packet == NULL) {
             return MEMORY_E;
         }
@@ -569,6 +582,357 @@ static int SnifferAsyncPollQueue(byte** data, char* err, SSLInfo* sslInfo,
 }
 #endif /* WOLFSSL_ASYNC_CRYPT */
 
+#ifdef THREADED_SNIFFTEST
+
+typedef struct {
+    volatile int lockCount;
+    pthread_mutex_t mutex;
+    pthread_cond_t cond;
+} wm_Sem;
+
+ /* Posix style semaphore */
+static int wm_SemInit(wm_Sem *s){
+    s->lockCount = 0;
+    pthread_mutex_init(&s->mutex, NULL);
+    pthread_cond_init(&s->cond, NULL);
+    return 0;
+}
+static int wm_SemFree(wm_Sem *s){
+    pthread_mutex_destroy(&s->mutex);
+    pthread_cond_destroy(&s->cond);
+    return 0;
+}
+static int wm_SemLock(wm_Sem *s){
+    pthread_mutex_lock(&s->mutex);
+    while (s->lockCount > 0)
+        pthread_cond_wait(&s->cond, &s->mutex);
+    s->lockCount++;
+    pthread_mutex_unlock(&s->mutex);
+    return 0;
+}
+static int wm_SemUnlock(wm_Sem *s){
+    pthread_mutex_lock(&s->mutex);
+    s->lockCount--;
+    pthread_cond_signal(&s->cond);
+    pthread_mutex_unlock(&s->mutex);
+    return 0;
+}
+
+typedef struct SnifferWorker {
+    SnifferPacket *head; /* head for doubly-linked list of sniffer packets */
+    SnifferPacket *tail; /* tail for doubly-linked list of sniffer packets */
+    wm_Sem         sem;
+    pthread_t      tid;
+    char *server;
+    char *keyFilesSrc;
+    char *passwd;
+    int   port;
+    int   hadBadPacket;  /* track if sniffer worker saw bad packet */
+    int   unused;
+    int   id;
+    int   shutdown;
+} SnifferWorker;
+
+static int ssl_Init_SnifferWorker(SnifferWorker* worker, int port,
+        const char* server, const char* keyFilesSrc, const char* passwd, int id)
+{
+    wm_SemInit(&worker->sem);
+    worker->server      = (char*)server;
+    worker->keyFilesSrc = (char*)keyFilesSrc;
+    worker->passwd      = (char*)passwd;
+    worker->port           = port;
+    worker->unused      = 0;
+    worker->shutdown    = 0;
+    worker ->id         = id;
+
+    worker->head = (SnifferPacket*)XMALLOC(sizeof(SnifferPacket), NULL,
+                           DYNAMIC_TYPE_TMP_BUFFER);
+
+    if (worker->head == NULL) {
+        XFREE(worker->head, NULL, DYNAMIC_TYPE_TMP_BUFFER);
+        return MEMORY_E;
+    }
+
+    XMEMSET(worker->head, 0, sizeof(SnifferPacket));
+
+    worker->tail = worker->head;
+    worker->head->packet = NULL;
+    worker->head->next = NULL;
+    worker->head->prev = NULL;
+    worker->head->placeholder  = 1;
+
+    return 0;
+}
+
+static void ssl_Free_SnifferWorker(SnifferWorker* worker)
+{
+    wm_SemFree(&worker->sem);
+
+    if (worker->head) {
+        XFREE(worker->head, NULL, DYNAMIC_TYPE_TMP_BUFFER);
+    }
+}
+
+static int SnifferWorkerPacketAdd(SnifferWorker* worker, int lastRet,
+        byte* packet, int length, int packetNumber)
+{
+    SnifferPacket* newEntry;
+
+    newEntry = (SnifferPacket*)XMALLOC(sizeof(SnifferPacket), NULL,
+                                       DYNAMIC_TYPE_TMP_BUFFER);
+
+    newEntry->packet = (byte*)XMALLOC(length, NULL, DYNAMIC_TYPE_TMP_BUFFER);
+    if (newEntry == NULL || newEntry->packet == NULL) {
+        XFREE(newEntry->packet, NULL, DYNAMIC_TYPE_TMP_BUFFER);
+        XFREE(newEntry, NULL, DYNAMIC_TYPE_TMP_BUFFER);
+        return MEMORY_E;
+    }
+    /* Set newEntry fields to input values */
+    XMEMCPY(newEntry->packet, packet, length);
+    newEntry->length = length;
+    newEntry->lastRet = lastRet;
+    newEntry->packetNumber = packetNumber;
+    newEntry->placeholder = 0;
+
+    /* Create worker head if null */
+    if (worker->head == NULL) {
+        worker->head = (SnifferPacket*)XMALLOC(sizeof(SnifferPacket), NULL,
+                           DYNAMIC_TYPE_TMP_BUFFER);
+        XMEMSET(worker->head, 0, sizeof(SnifferPacket));
+
+        worker->tail = worker->head;
+        worker->head->packet = NULL;
+        worker->head->next = NULL;
+        worker->head->prev = NULL;
+        worker->head->placeholder = 1;
+    }
+
+    if (worker->head->placeholder) {
+        /* First packet added to be to SnifferWorker linked list,
+         * set head and tail to the new packet */
+        XFREE(worker->head, NULL, DYNAMIC_TYPE_TMP_BUFFER);
+        newEntry->next = NULL;
+        newEntry->prev = NULL;
+        worker->head = newEntry;
+        worker->tail = newEntry;
+    }
+    else {
+        /* Add packet to SnifferWorker linked list and move tail */
+        newEntry->prev = worker->tail;
+        newEntry->next = NULL;
+        worker->tail->next = newEntry;
+        worker->tail = newEntry;
+    }
+
+    return 0;
+
+}
+#endif /* THREADED_SNIFFTEST */
+
+static int DecodePacket(byte* packet, int length, int packetNumber, char err[])
+{
+    int     ret, j;
+    int     hadBadPacket = 0;
+    int     isChain      = 0;
+    int     chainSz;
+    void*   chain;
+    byte*   data         = NULL; /* pointer to decrypted data */
+    SSLInfo sslInfo;
+#ifdef WOLFSSL_SNIFFER_CHAIN_INPUT
+    struct iovec chains[CHAIN_INPUT_COUNT];
+    unsigned int remainder;
+
+    j         = 0;
+    chainSz   = 0;
+    isChain   = 1;
+    remainder = length;
+    do {
+        unsigned int chunkSz = min(remainder, CHAIN_INPUT_CHUNK_SIZE);
+        chains[chainSz].iov_base = (void*)(packet + j);
+        chains[chainSz].iov_len = chunkSz;
+        j += chunkSz;
+        remainder -= chunkSz;
+        chainSz++;
+    } while (j < (int)length);
+    chain = (void*)chains;
+#else
+    chain = (void*)packet;
+    chainSz = length;
+#endif
+
+#if defined(DEBUG_SNIFFER)
+    printf("Packet Number: %d\n", packetNumber);
+#endif
+
+    /* decode packet */
+#ifdef WOLFSSL_ASYNC_CRYPT
+    /* For async call the original API again with same data,
+     * or call with different sessions for multiple concurrent
+     * stream processing */
+    ret = ssl_DecodePacketAsync(chain, chainSz, isChain, &data, err,
+        &sslInfo, NULL);
+
+    /* WC_PENDING_E: Hardware is processing or stream is blocked
+     *               (waiting on WC_PENDING_E) */
+    if (ret == WC_PENDING_E) {
+        /* add to queue, for later processing */
+    #ifdef DEBUG_SNIFFER
+        printf("Steam is pending, queue packet %d\n", packetNumber);
+    #endif
+        ret = SnifferAsyncQueueAdd(ret, chain, chainSz, isChain,
+            packetNumber);
+        if (ret >= 0) {
+            ret = 0; /* mark event just added */
+        }
+    }
+
+#elif defined(WOLFSSL_SNIFFER_CHAIN_INPUT) && \
+defined(WOLFSSL_SNIFFER_STORE_DATA_CB)
+    ret = ssl_DecodePacketWithChainSessionInfoStoreData(chain, chainSz,
+            &data, &sslInfo, err);
+#elif defined(WOLFSSL_SNIFFER_CHAIN_INPUT)
+    (void)sslInfo;
+    ret = ssl_DecodePacketWithChain(chain, chainSz, &data, err);
+#else
+#if defined(WOLFSSL_SNIFFER_STORE_DATA_CB)
+    ret = ssl_DecodePacketWithSessionInfoStoreData(packet,
+            length, &data, &sslInfo, err);
+#else
+    ret = ssl_DecodePacketWithSessionInfo(packet, length, &data,
+                                            &sslInfo, err);
+#endif
+    (void)chain;
+    (void)chainSz;
+#endif
+
+    if (ret < 0) {
+        printf("ssl_Decode ret = %d, %s on packet number %d\n", ret, err,
+                packetNumber);
+        hadBadPacket = 1;
+    }
+
+    if (data != NULL && ret > 0) {
+        /* Convert non-printable data to periods. */
+        for (j = 0; j < ret; j++) {
+            if (isprint(data[j]) || isspace(data[j])) continue;
+            data[j] = '.';
+        }
+        data[ret] = 0;
+        printf("SSL App Data(%d:%d):%s\n", packetNumber, ret, data);
+        ssl_FreeZeroDecodeBuffer(&data, ret, err);
+    }
+
+    (void)isChain;
+
+    return hadBadPacket;
+}
+
+#ifdef THREADED_SNIFFTEST
+static void* snifferWorker(void* arg)
+{
+    SnifferWorker* worker = (SnifferWorker*)arg;
+    char err[PCAP_ERRBUF_SIZE];
+
+    ssl_InitSniffer_ex2(worker->id);
+    ssl_Trace("./tracefile.txt", err);
+    ssl_EnableRecovery(1, -1, err);
+#ifdef WOLFSSL_SNIFFER_WATCH
+    ssl_SetWatchKeyCallback(myWatchCb, err);
+#endif
+#ifdef WOLFSSL_SNIFFER_STORE_DATA_CB
+    ssl_SetStoreDataCallback(myStoreDataCb);
+#endif
+
+    load_key(NULL, worker->server, worker->port, worker->keyFilesSrc,
+             worker->passwd, err);
+
+    /* continue processing the workers packets and keep expecting them
+     * until the shutdown flag is set */
+    while (!worker->shutdown) {
+        while (worker->head) {
+            int   ret = 0;
+            byte* packet;
+            int   length;
+            int   packetNumber;
+        #ifdef WOLFSSL_ASYNC_CRYPT
+            SSLInfo sslInfo;
+            byte*   data;
+            int     queueSz = 0;
+
+            /* poll hardware and attempt to process items in queue. If
+             * returns > 0 then data pointer has decrypted something */
+            SnifferAsyncPollQueue(&data, err, &sslInfo, &queueSz);
+            if (queueSz >= WOLF_ASYNC_MAX_PENDING) {
+                /* queue full, poll again */
+                continue;
+            }
+        #endif
+
+            /* Shutdown worker if it was not utilized */
+            if (worker->unused) {
+                XFREE(worker->head, NULL, DYNAMIC_TYPE_TMP_BUFFER);
+                worker->head = NULL;
+                break;
+            }
+
+            /* get lock */
+            wm_SemLock(&worker->sem);
+
+            /* get packet for current worker head */
+            packet       = worker->head->packet;
+            length       = worker->head->length;
+            packetNumber = worker->head->packetNumber;
+
+            wm_SemUnlock(&worker->sem);
+
+            if (packet == NULL) {
+                continue;
+            }
+
+            /* Decode Packet, ret value will indicate whether a
+             * bad packet was encountered */
+            ret = DecodePacket(packet, length, packetNumber, err);
+            if (ret) {
+                worker->hadBadPacket = 1;
+            }
+
+            /* get lock */
+            wm_SemLock(&worker->sem);
+
+            XFREE(worker->head->packet, NULL, DYNAMIC_TYPE_TMP_BUFFER);
+            worker->head->packet = NULL;
+
+            if (worker->head->next) {
+                /* Move head and free */
+                worker->head = worker->head->next;
+                XFREE(worker->head->prev, NULL, DYNAMIC_TYPE_TMP_BUFFER);
+                worker->head->prev = NULL;
+            }
+            else {
+                /* No other packets in list. Keep looping until more packets
+                 * arrive or worker is shutdown. */
+                XFREE(worker->head, NULL, DYNAMIC_TYPE_TMP_BUFFER);
+                worker->head = NULL;
+            }
+            wm_SemUnlock(&worker->sem);
+
+        } /* while (worker->head) */
+
+        if (worker->unused) {
+                break;
+        }
+
+    } /* while (worker->head) */
+
+    /* Thread cleanup */
+    ssl_FreeSniffer();
+#if defined(HAVE_ECC) && defined(FP_ECC)
+    wc_ecc_fp_free();
+#endif
+    return NULL;
+}
+#endif /* THREADED_SNIFFTEST */
+
 int main(int argc, char** argv)
 {
     int          ret = 0;
@@ -577,6 +941,7 @@ int main(int argc, char** argv)
     int          port = 0;
     int          saveFile = 0;
     int          i = 0, defDev = 0;
+    int          packetNumber = 0;
     int          frame = ETHER_IF_FRAME_LEN;
     char         err[PCAP_ERRBUF_SIZE];
     char         filter[32];
@@ -585,30 +950,40 @@ int main(int argc, char** argv)
     char         keyFilesUser[MAX_FILENAME_SZ];
     const char  *server = NULL;
     const char  *sniName = NULL;
+    const char  *passwd = NULL;
     pcap_if_t   *d;
     pcap_addr_t *a;
-    int          isChain = 0;
-    int          j;
-#ifdef WOLFSSL_SNIFFER_CHAIN_INPUT
-    struct iovec chains[CHAIN_INPUT_COUNT];
-    unsigned int remainder;
+#ifdef THREADED_SNIFFTEST
+    int workerThreadCount;
+#ifdef HAVE_SESSION_TICKET
+    /* Multiple threads on resume not yet supported */
+    workerThreadCount = 1;
+#else
+    workerThreadCount = 5;
+    if (argc >= 7)
+        workerThreadCount = XATOI(argv[6]);
+#endif
+    SnifferWorker workers[workerThreadCount];
+    int           used[workerThreadCount];
 #endif
-    int packetNumber = 0;
 
     show_appinfo();
 
     signal(SIGINT, sig_handler);
 
-#ifndef _WIN32
+
+#ifndef THREADED_SNIFFTEST
+    #ifndef _WIN32
     ssl_InitSniffer();   /* dll load on Windows */
-#endif
+    #endif
     ssl_Trace("./tracefile.txt", err);
     ssl_EnableRecovery(1, -1, err);
-#ifdef WOLFSSL_SNIFFER_WATCH
+    #ifdef WOLFSSL_SNIFFER_WATCH
     ssl_SetWatchKeyCallback(myWatchCb, err);
-#endif
-#ifdef WOLFSSL_SNIFFER_STORE_DATA_CB
+    #endif
+    #ifdef WOLFSSL_SNIFFER_STORE_DATA_CB
     ssl_SetStoreDataCallback(myStoreDataCb);
+    #endif
 #endif
 
     if (argc == 1) {
@@ -756,8 +1131,6 @@ int main(int argc, char** argv)
             ret = -1;
         }
         else {
-            const char* passwd = NULL;
-
             /* defaults for server and port */
             port = 443;
             server = "127.0.0.1";
@@ -802,36 +1175,48 @@ int main(int argc, char** argv)
     if (pcap_datalink(pcap) == DLT_NULL)
         frame = NULL_IF_FRAME_LEN;
 
+#ifdef THREADED_SNIFFTEST
+    XMEMSET(used, 0, sizeof(used));
+    XMEMSET(&workers, 0, sizeof(workers));
+
+    for (i=0; i<workerThreadCount; i++) {
+        ssl_Init_SnifferWorker(&workers[i], port, server, keyFilesSrc,
+                               passwd, i);
+        pthread_create(&workers[i].tid, NULL, snifferWorker, &workers[i]);
+    }
+#endif
+
     while (1) {
         struct pcap_pkthdr header;
         const unsigned char* packet = NULL;
-        SSLInfo sslInfo;
-        void* chain = NULL;
-        int   chainSz = 0;
         byte* data = NULL; /* pointer to decrypted data */
-#ifdef WOLFSSL_ASYNC_CRYPT
-        int queueSz = 0;
+#ifdef THREADED_SNIFFTEST
+        SnifferStreamInfo info;
+        uint8_t  infoSum;
+        uint8_t* infoPtr;
+        int      threadNum;
 #endif
+#if defined(WOLFSSL_ASYNC_CRYPT)
+        SSLInfo sslInfo;
+        int     queueSz = 0;
 
-#ifndef WOLFSSL_ASYNC_CRYPT
-        ret = 0; /* reset status */
-#else
+        XMEMSET(&sslInfo, 0, sizeof(sslInfo));
         /* poll hardware and attempt to process items in queue. If returns > 0
          * then data pointer has decrypted something */
-        ret = SnifferAsyncPollQueue(&data, err, &sslInfo, &queueSz);
+        SnifferAsyncPollQueue(&data, err, &sslInfo, &queueSz);
         if (queueSz >= WOLF_ASYNC_MAX_PENDING) {
             /* queue full, poll again */
             continue;
         }
 #endif
+        ret = 0; /* reset status */
+
         if (data == NULL) {
-            /* grab next pcap packet */
+        /* grab next pcap packet */
             packetNumber++;
             packet = pcap_next(pcap, &header);
-        #ifdef DEBUG_SNIFFER
-            printf("Packet Number: %d\n", packetNumber);
-        #endif
         }
+
         if (packet) {
             if (header.caplen > 40)  { /* min ip(20) + min tcp(20) */
                 packet        += frame;
@@ -841,67 +1226,45 @@ int main(int argc, char** argv)
                 /* packet doesn't contain minimum ip/tcp header */
                 continue;
             }
+#ifdef THREADED_SNIFFTEST
+            XMEMSET(&info, 0, sizeof(SnifferStreamInfo));
 
-#ifdef WOLFSSL_SNIFFER_CHAIN_INPUT
-            isChain = 1;
-            j = 0;
-            remainder = header.caplen;
-            chainSz = 0;
-            do {
-                unsigned int chunkSz = min(remainder, CHAIN_INPUT_CHUNK_SIZE);
-                chains[chainSz].iov_base = (void*)(packet + j);
-                chains[chainSz].iov_len = chunkSz;
-                j += chunkSz;
-                remainder -= chunkSz;
-                chainSz++;
-            } while (j < (int)header.caplen);
-            chain = (void*)chains;
-#else
-            chain = (void*)packet;
-            chainSz = header.caplen;
-#endif
+            ret = ssl_DecodePacket_GetStream(&info, packet, header.caplen, err);
 
-#ifdef WOLFSSL_ASYNC_CRYPT
-            /* For async call the original API again with same data,
-             * or call with different sessions for multiple concurrent
-             * stream processing */
-            ret = ssl_DecodePacketAsync(chain, chainSz, isChain, &data, err,
-                &sslInfo, NULL);
-
-            /* WC_PENDING_E: Hardware is processing or stream is blocked
-             *               (waiting on WC_PENDING_E) */
-            if (ret == WC_PENDING_E) {
-                /* add to queue, for later processing */
-            #ifdef DEBUG_SNIFFER
-                printf("Steam is pending, queue packet %d\n", packetNumber);
-            #endif
-                ret = SnifferAsyncQueueAdd(ret, chain, chainSz, isChain,
-                    packetNumber);
-                if (ret >= 0) {
-                    ret = 0; /* mark event just added */
-                }
+            /* calculate SnifferStreamInfo checksum */
+            infoSum = 0;
+            infoPtr = (uint8_t*)&info;
+
+            for (i=0; i<(int)sizeof(SnifferStreamInfo); i++) {
+                infoSum += infoPtr[i];
             }
 
-#elif defined(WOLFSSL_SNIFFER_CHAIN_INPUT) && \
-      defined(WOLFSSL_SNIFFER_STORE_DATA_CB)
-            ret = ssl_DecodePacketWithChainSessionInfoStoreData(chain, chainSz,
-                    &data, &sslInfo, err);
-#elif defined(WOLFSSL_SNIFFER_CHAIN_INPUT)
-            (void)sslInfo;
-            ret = ssl_DecodePacketWithChain(chain, chainSz, &data, err);
+            /* determine thread to handle stream */
+            threadNum = infoSum % workerThreadCount;
+            used[threadNum] = 1;
+        #ifdef DEBUG_SNIFFER
+            printf("Sending packet %d to thread number %d\n", packetNumber,
+                    threadNum);
+        #endif
+
+            /* get lock on thread mutex */
+            wm_SemLock(&workers[threadNum].sem);
+
+            /* add the packet to the worker's linked list */
+            if (SnifferWorkerPacketAdd(&workers[threadNum], ret, (byte*)packet,
+                                   header.caplen, packetNumber)) {
+                printf("Unable to add packet %d to worker", packetNumber);
+                break;
+            }
+
+            wm_SemUnlock(&workers[threadNum].sem);
 #else
-    #if defined(WOLFSSL_SNIFFER_STORE_DATA_CB)
-            ret = ssl_DecodePacketWithSessionInfoStoreData(packet,
-                    header.caplen, &data, &sslInfo, err);
-    #else
-            ret = ssl_DecodePacketWithSessionInfo(packet, header.caplen, &data,
-                                                  &sslInfo, err);
-    #endif
-            (void)chain;
-            (void)chainSz;
+            /* Decode Packet, ret value will indicate whether a
+             * bad packet was encountered */
+            hadBadPacket = DecodePacket((byte*)packet, header.caplen,
+                                        packetNumber,err);
 #endif
         }
-
         /* check if we are done reading file */
         if (packet == NULL && data == NULL && saveFile) {
         #ifdef WOLFSSL_ASYNC_CRYPT
@@ -912,23 +1275,25 @@ int main(int argc, char** argv)
             break;
         }
 
-        if (ret < 0) {
-            printf("ssl_Decode ret = %d, %s\n", ret, err);
-            hadBadPacket = 1;
-        }
-        if (data != NULL && ret > 0) {
-            /* Convert non-printable data to periods. */
-            for (j = 0; j < ret; j++) {
-                if (isprint(data[j]) || isspace(data[j])) continue;
-                data[j] = '.';
-            }
-            data[ret] = 0;
-            printf("SSL App Data(%d:%d):%s\n", packetNumber, ret, data);
-            ssl_FreeZeroDecodeBuffer(&data, ret, err);
+    }
+
+#ifdef THREADED_SNIFFTEST
+    for (i=0; i<workerThreadCount; i++) {
+        workers[i].shutdown = 1;
+        if (used[i] == 0)
+            workers[i].unused = 1;
+        pthread_join(workers[i].tid, NULL);
+    }
+
+    for (i=0; i<workerThreadCount; i++) {
+        if (workers[i].hadBadPacket) {
+           hadBadPacket = 1;
         }
+        ssl_Free_SnifferWorker(&workers[i]);
     }
+#endif
+
     FreeAll();
-    (void)isChain;
 
     return hadBadPacket ? EXIT_FAILURE : EXIT_SUCCESS;
 }

+ 1 - 2
wolfcrypt/src/wc_port.c

@@ -1061,8 +1061,7 @@ size_t wc_strlcat(char *dst, const char *src, size_t dstSize)
 }
 #endif /* USE_WOLF_STRLCAT */
 
-#ifndef SINGLE_THREADED
-/* TODO: use atomic operations instead of mutex */
+#if !defined(SINGLE_THREADED) && !defined(HAVE_C___ATOMIC)
 void wolfSSL_RefInit(wolfSSL_Ref* ref, int* err)
 {
     int ret = wc_InitMutex(&ref->mutex);

+ 21 - 0
wolfssl/sniffer.h

@@ -47,6 +47,22 @@
     extern "C" {
 #endif
 
+
+typedef struct IpAddrInfo {
+    int version;
+    union {
+        word32 ip4;
+        byte   ip6[16];
+    };
+} IpAddrInfo;
+
+typedef struct SnifferStreamInfo {
+    IpAddrInfo src;          /* server address in network byte order */
+    IpAddrInfo dst;          /* client address in network byte order */
+    word16            dstPort;         /* server port */
+    word16            srcPort;         /* client port */
+} SnifferStreamInfo;
+
 /* @param typeK: (formerly keyType) was shadowing a global declaration in
  *                wolfssl/wolfcrypt/asn.h line 175
  */
@@ -128,6 +144,8 @@ WOLFSSL_API
 SSL_SNIFFER_API void ssl_InitSniffer(void);
 WOLFSSL_API
 SSL_SNIFFER_API void ssl_InitSniffer_ex(int devId);
+WOLFSSL_API
+SSL_SNIFFER_API void ssl_InitSniffer_ex2(int threadNum);
 
 WOLFSSL_API
 SSL_SNIFFER_API void ssl_FreeSniffer(void);
@@ -278,6 +296,9 @@ SSL_SNIFFER_API int ssl_DecodePacketWithChainSessionInfoStoreData(
         char* error);
 #endif
 
+WOLFSSL_API
+SSL_SNIFFER_API int ssl_DecodePacket_GetStream(SnifferStreamInfo* info,
+        const byte* packet, int length, char* error);
 
 #ifdef WOLFSSL_ASYNC_CRYPT
 

+ 31 - 24
wolfssl/wolfcrypt/wc_port.h

@@ -284,37 +284,44 @@
 
 /* Reference counting. */
 typedef struct wolfSSL_Ref {
-/* TODO: use atomic operations instead of mutex. */
-#ifndef SINGLE_THREADED
+#if !defined(SINGLE_THREADED) && !defined(HAVE_C___ATOMIC)
     wolfSSL_Mutex mutex;
 #endif
     int count;
 } wolfSSL_Ref;
 
 #ifdef SINGLE_THREADED
-#define wolfSSL_RefInit(ref, err)           \
-    do {                                    \
-        (ref)->count = 1;                   \
-        *(err) = 0;                         \
-    }                                       \
-    while (0)
-
+#define wolfSSL_RefInit(ref, err) do {      \
+    (ref)->count = 1;                       \
+    *(err) = 0;                             \
+} while(0);
 #define wolfSSL_RefFree(ref)
-
-#define wolfSSL_RefInc(ref, err)            \
-    do {                                    \
-        (ref)->count++;                     \
-        *(err) = 0;                         \
-    }                                       \
-    while (0)
-
-#define wolfSSL_RefDec(ref, isZero, err)    \
-    do {                                    \
-        (ref)->count--;                     \
-        *(isZero) = ((ref)->count == 0);    \
-        *(err) = 0;                         \
-    }                                       \
-    while (0)
+#define wolfSSL_RefInc(ref, err) do {      \
+    (ref)->count++;                        \
+    *(err) = 0;                            \
+} while(0);
+#define wolfSSL_RefDec(ref, isZero, err) do { \
+    (ref)->count--;                           \
+    *(isZero) = ((ref)->count == 0);          \
+    *(err) = 0;                               \
+} while(0);
+#elif defined(HAVE_C___ATOMIC)
+#define wolfSSL_RefInit(ref, err) do {      \
+    (ref)->count = 1;                       \
+    *(err) = 0;                             \
+} while(0);
+#define wolfSSL_RefFree(ref)
+#define wolfSSL_RefInc(ref, err)  do {      \
+    __atomic_fetch_add(&(ref)->count, 1,    \
+        __ATOMIC_RELAXED);                  \
+    *(err) = 0;                             \
+} while(0);
+#define wolfSSL_RefDec(ref, isZero, err) do { \
+    __atomic_fetch_sub(&(ref)->count, 1,      \
+        __ATOMIC_RELAXED);                    \
+    *(isZero) = ((ref)->count == 0);          \
+    *(err) = 0;                               \
+} while(0);
 #else
 WOLFSSL_LOCAL void wolfSSL_RefInit(wolfSSL_Ref* ref, int* err);
 WOLFSSL_LOCAL void wolfSSL_RefFree(wolfSSL_Ref* ref);